diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index 6b223d9eb0c..7a5f0ddf45a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -274,19 +274,27 @@ MergeTreeDataPartChecksums MergeTreeDataPartChecksums::parse(const String & s) return res; } - -const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetBinChecksum(const String & name) const +const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetChecksum(const String & name, const String & ext) const { if (checksums.empty()) return nullptr; const auto & files = checksums.files; - const auto bin_file_name = escapeForFileName(name) + ".bin"; - auto it = files.find(bin_file_name); + const auto file_name = escapeForFileName(name) + ext; + auto it = files.find(file_name); return (it == files.end()) ? nullptr : &it->second; } +const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetBinChecksum(const String & name) const +{ + return tryGetChecksum(name, ".bin"); +} + +const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetMrkChecksum(const String & name) const +{ + return tryGetChecksum(name, ".mrk"); +} static ReadBufferFromFile openForReading(const String & path) { @@ -399,6 +407,13 @@ size_t MergeTreeDataPart::getColumnUncompressedSize(const String & name) const } +size_t MergeTreeDataPart::getColumnMrkSize(const String & name) const +{ + const Checksum * checksum = tryGetMrkChecksum(name); + return checksum ? checksum->file_size : 0; +} + + /** Returns the name of a column with minimum compressed size (as returned by getColumnSize()). * If no checksums are present returns the name of the first physically existing column. */ @@ -925,6 +940,18 @@ size_t MergeTreeDataPart::getIndexSizeInAllocatedBytes() const return res; } +size_t MergeTreeDataPart::getTotalMrkSizeInBytes() const +{ + size_t res = 0; + for (const NameAndTypePair & it : columns) + { + const Checksum * checksum = tryGetMrkChecksum(it.name); + if (checksum) + res += checksum->file_size; + } + return res; +} + String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state) { switch (state) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h index 9ec9cba56e6..17d15eba798 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h @@ -97,12 +97,17 @@ struct MergeTreeDataPart MergeTreeDataPart(MergeTreeData & storage_, const String & name_); + const Checksum * tryGetChecksum(const String & name, const String & ext) const; /// Returns checksum of column's binary file. const Checksum * tryGetBinChecksum(const String & name) const; + /// Returns checksum of column's mrk file. + const Checksum * tryGetMrkChecksum(const String & name) const; /// Returns the size of .bin file for column `name` if found, zero otherwise size_t getColumnCompressedSize(const String & name) const; size_t getColumnUncompressedSize(const String & name) const; + /// Returns the size of .mrk file for column `name` if found, zero otherwise + size_t getColumnMrkSize(const String & name) const; /// Returns the name of a column with minimum compressed size (as returned by getColumnSize()). /// If no checksums are present returns the name of the first physically existing column. @@ -294,6 +299,8 @@ struct MergeTreeDataPart /// For data in RAM ('index') size_t getIndexSizeInBytes() const; size_t getIndexSizeInAllocatedBytes() const; + /// Total size of *.mrk files + size_t getTotalMrkSizeInBytes() const; private: /// Reads columns names and types from columns.txt diff --git a/dbms/src/Storages/System/StorageSystemPartsColumns.cpp b/dbms/src/Storages/System/StorageSystemPartsColumns.cpp new file mode 100644 index 00000000000..d246116465c --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemPartsColumns.cpp @@ -0,0 +1,282 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + + +StorageSystemPartsColumns::StorageSystemPartsColumns(const std::string & name_) + : name(name_), + columns + { + {"partition", std::make_shared()}, + {"name", std::make_shared()}, + {"active", std::make_shared()}, + {"marks", std::make_shared()}, + {"marks_size", std::make_shared()}, + {"rows", std::make_shared()}, + {"bytes", std::make_shared()}, + {"modification_time", std::make_shared()}, + {"remove_time", std::make_shared()}, + {"refcount", std::make_shared()}, + {"min_date", std::make_shared()}, + {"max_date", std::make_shared()}, + {"min_block_number", std::make_shared()}, + {"max_block_number", std::make_shared()}, + {"level", std::make_shared()}, + {"primary_key_bytes_in_memory", std::make_shared()}, + {"primary_key_bytes_in_memory_allocated", std::make_shared()}, + + {"database", std::make_shared()}, + {"table", std::make_shared()}, + {"engine", std::make_shared()} + } +{ +} + + +BlockInputStreams StorageSystemPartsColumns::read( + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + const size_t /*max_block_size*/, + const unsigned /*num_streams*/) +{ + bool has_state_column = false; + Names real_column_names; + + for (const String & column_name : column_names) + { + if (column_name == "_state") + has_state_column = true; + else + real_column_names.emplace_back(column_name); + } + + /// Do not check if only _state column is requested + if (!(has_state_column && real_column_names.empty())) + check(real_column_names); + + processed_stage = QueryProcessingStage::FetchColumns; + + /// Will apply WHERE to subset of columns and then add more columns. + /// This is kind of complicated, but we use WHERE to do less work. + + Block block_to_filter; + + std::map, StoragePtr> storages; + + { + Databases databases = context.getDatabases(); + + /// Add column 'database'. + ColumnPtr database_column = std::make_shared(); + for (const auto & database : databases) + database_column->insert(database.first); + block_to_filter.insert(ColumnWithTypeAndName(database_column, std::make_shared(), "database")); + + /// Filter block_to_filter with column 'database'. + VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context); + + if (!block_to_filter.rows()) + return BlockInputStreams(); + + /// Add columns 'table', 'engine', 'active' + database_column = block_to_filter.getByName("database").column; + size_t rows = database_column->size(); + + ColumnPtr table_column = std::make_shared(); + ColumnPtr engine_column = std::make_shared(); + ColumnPtr active_column = std::make_shared(); + + for (size_t i = 0; i < rows; ++i) + { + String database_name = (*database_column)[i].get(); + const DatabasePtr database = databases.at(database_name); + + for (auto iterator = database->getIterator(context); iterator->isValid(); iterator->next()) + { + String table_name = iterator->name(); + StoragePtr storage = iterator->table(); + String engine_name = storage->getName(); + + if (!dynamic_cast(&*storage) && + !dynamic_cast(&*storage)) + continue; + + storages[std::make_pair(database_name, iterator->name())] = storage; + + /// Add all combinations of flag 'active'. + for (auto active : {0, 1}) + { + table_column->insert(table_name); + engine_column->insert(engine_name); + active_column->insert(active); + } + } + } + + block_to_filter.insert(ColumnWithTypeAndName(table_column, std::make_shared(), "table")); + block_to_filter.insert(ColumnWithTypeAndName(engine_column, std::make_shared(), "engine")); + block_to_filter.insert(ColumnWithTypeAndName(active_column, std::make_shared(), "active")); + } + + /// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'. + VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context); + + /// If all was filtered out. + if (!block_to_filter.rows()) + return {}; + + ColumnPtr filtered_database_column = block_to_filter.getByName("database").column; + ColumnPtr filtered_table_column = block_to_filter.getByName("table").column; + ColumnPtr filtered_active_column = block_to_filter.getByName("active").column; + + /// Finally, create the result. + + Block block = getSampleBlock(); + if (has_state_column) + block.insert(ColumnWithTypeAndName(std::make_shared(), "_state")); + + for (size_t i = 0; i < filtered_database_column->size();) + { + String database = (*filtered_database_column)[i].get(); + String table = (*filtered_table_column)[i].get(); + + /// What 'active' value we need. + bool need[2]{}; /// [active] + for (; i < filtered_database_column->size() && + (*filtered_database_column)[i].get() == database && + (*filtered_table_column)[i].get() == table; ++i) + { + bool active = (*filtered_active_column)[i].get() != 0; + need[active] = true; + } + + StoragePtr storage = storages.at(std::make_pair(database, table)); + TableStructureReadLockPtr table_lock; + + try + { + table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); /// For table not to be dropped. + } + catch (const Exception & e) + { + /** There are case when IStorage::drop was called, + * but we still own the object. + * Then table will throw exception at attempt to lock it. + * Just skip the table. + */ + if (e.code() == ErrorCodes::TABLE_IS_DROPPED) + continue; + + throw; + } + + String engine = storage->getName(); + + MergeTreeData * data = nullptr; + + if (auto merge_tree = dynamic_cast(&*storage)) + { + data = &merge_tree->getData(); + } + else if (auto replicated_merge_tree = dynamic_cast(&*storage)) + { + data = &replicated_merge_tree->getData(); + } + else + { + throw Exception("Unknown engine " + engine, ErrorCodes::LOGICAL_ERROR); + } + + using State = MergeTreeDataPart::State; + MergeTreeData::DataPartStateVector all_parts_state; + MergeTreeData::DataPartsVector all_parts; + + if (need[0]) + { + /// If has_state_column is requested, return all states + if (!has_state_column) + all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, &all_parts_state); + else + all_parts = data->getAllDataPartsVector(&all_parts_state); + } + else + all_parts = data->getDataPartsVector({State::Committed}, &all_parts_state); + + + /// Finally, we'll go through the list of parts. + for (size_t part_number = 0; part_number < all_parts.size(); ++part_number) + { + const auto & part = all_parts[part_number]; + auto part_state = all_parts_state[part_number]; + + size_t j = 0; + { + WriteBufferFromOwnString out; + part->partition.serializeTextQuoted(*data, out); + block.getByPosition(j++).column->insert(out.str()); + } + block.getByPosition(j++).column->insert(part->name); + block.getByPosition(j++).column->insert(static_cast(part_state == State::Committed)); + block.getByPosition(j++).column->insert(static_cast(part->marks_count)); + block.getByPosition(j++).column->insert(static_cast(part->getTotalMrkSizeInBytes())); + + block.getByPosition(j++).column->insert(static_cast(part->rows_count)); + block.getByPosition(j++).column->insert(static_cast(part->size_in_bytes)); + block.getByPosition(j++).column->insert(static_cast(part->modification_time)); + block.getByPosition(j++).column->insert(static_cast(part->remove_time)); + + /// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts. + block.getByPosition(j++).column->insert(static_cast(part.use_count() - 1)); + + block.getByPosition(j++).column->insert(static_cast(part->getMinDate())); + block.getByPosition(j++).column->insert(static_cast(part->getMaxDate())); + block.getByPosition(j++).column->insert(part->info.min_block); + block.getByPosition(j++).column->insert(part->info.max_block); + block.getByPosition(j++).column->insert(static_cast(part->info.level)); + block.getByPosition(j++).column->insert(static_cast(part->getIndexSizeInBytes())); + block.getByPosition(j++).column->insert(static_cast(part->getIndexSizeInAllocatedBytes())); + + block.getByPosition(j++).column->insert(database); + block.getByPosition(j++).column->insert(table); + block.getByPosition(j++).column->insert(engine); + + if (has_state_column) + block.getByPosition(j++).column->insert(part->stateString()); + } + } + + return BlockInputStreams(1, std::make_shared(block)); +} + +NameAndTypePair StorageSystemPartsColumns::getColumn(const String & column_name) const +{ + if (column_name == "_state") + return NameAndTypePair("_state", std::make_shared()); + + return ITableDeclaration::getColumn(column_name); +} + +bool StorageSystemPartsColumns::hasColumn(const String & column_name) const +{ + if (column_name == "_state") + return true; + + return ITableDeclaration::hasColumn(column_name); +} + + +} diff --git a/dbms/src/Storages/System/StorageSystemPartsColumns.h b/dbms/src/Storages/System/StorageSystemPartsColumns.h new file mode 100644 index 00000000000..13f46ae1de9 --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemPartsColumns.h @@ -0,0 +1,43 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +class Context; + + +/** Implements system table 'parts' which allows to get information about data parts for tables of MergeTree family. + */ +class StorageSystemPartsColumns : public ext::shared_ptr_helper, public IStorage +{ +public: + std::string getName() const override { return "SystemPartsColumns"; } + std::string getTableName() const override { return name; } + + const NamesAndTypesList & getColumnsListImpl() const override { return columns; } + + NameAndTypePair getColumn(const String & column_name) const override; + + bool hasColumn(const String & column_name) const override; + + BlockInputStreams read( + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size, + unsigned num_streams) override; + +private: + const std::string name; + NamesAndTypesList columns; + +protected: + StorageSystemPartsColumns(const std::string & name_); +}; + +} diff --git a/dbms/src/Storages/System/attachSystemTables.cpp b/dbms/src/Storages/System/attachSystemTables.cpp index aa9e9e13196..f3256d1ee37 100644 --- a/dbms/src/Storages/System/attachSystemTables.cpp +++ b/dbms/src/Storages/System/attachSystemTables.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -44,6 +45,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper) { attachSystemTablesLocal(system_database); system_database.attachTable("parts", StorageSystemParts::create("parts")); + system_database.attachTable("parts_columns", StorageSystemPartsColumns::create("parts_columns")); system_database.attachTable("processes", StorageSystemProcesses::create("processes")); system_database.attachTable("metrics", StorageSystemMetrics::create("metrics")); system_database.attachTable("merges", StorageSystemMerges::create("merges"));