Fill only requested columns when querying system.parts & system.parts_columns

This commit is contained in:
Anmol Arora 2021-02-21 14:04:55 +00:00 committed by Anmol Arora
parent 628d78f4f1
commit 2d9b524bdd
6 changed files with 221 additions and 107 deletions

View File

@ -81,7 +81,8 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{
}
void StorageSystemParts::processNextStorage(MutableColumns & columns_, const StoragesInfo & info, bool has_state_column)
void StorageSystemParts::processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
{
using State = IMergeTreeDataPart::State;
MergeTreeData::DataPartStateVector all_parts_state;
@ -96,97 +97,154 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto
ColumnSize columns_size = part->getTotalColumnsSize();
size_t i = 0;
size_t src_index = 0, res_index = 0;
if (columns_mask[src_index++])
{
WriteBufferFromOwnString out;
part->partition.serializeText(*info.data, out, format_settings);
columns_[i++]->insert(out.str());
columns[res_index++]->insert(out.str());
}
columns_[i++]->insert(part->name);
columns_[i++]->insert(part->uuid);
columns_[i++]->insert(part->getTypeName());
columns_[i++]->insert(part_state == State::Committed);
columns_[i++]->insert(part->getMarksCount());
columns_[i++]->insert(part->rows_count);
columns_[i++]->insert(part->getBytesOnDisk());
columns_[i++]->insert(columns_size.data_compressed);
columns_[i++]->insert(columns_size.data_uncompressed);
columns_[i++]->insert(columns_size.marks);
columns_[i++]->insert(static_cast<UInt64>(part->modification_time));
if (columns_mask[src_index++])
columns[res_index++]->insert(part->name);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->uuid);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getTypeName());
if (columns_mask[src_index++])
columns[res_index++]->insert(part_state == State::Committed);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getMarksCount());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->rows_count);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getBytesOnDisk());
if (columns_mask[src_index++])
columns[res_index++]->insert(columns_size.data_compressed);
if (columns_mask[src_index++])
columns[res_index++]->insert(columns_size.data_uncompressed);
if (columns_mask[src_index++])
columns[res_index++]->insert(columns_size.marks);
if (columns_mask[src_index++])
columns[res_index++]->insert(static_cast<UInt64>(part->modification_time));
time_t remove_time = part->remove_time.load(std::memory_order_relaxed);
columns_[i++]->insert(static_cast<UInt64>(remove_time == std::numeric_limits<time_t>::max() ? 0 : remove_time));
if (columns_mask[src_index++])
{
time_t remove_time = part->remove_time.load(std::memory_order_relaxed);
columns[res_index++]->insert(static_cast<UInt64>(remove_time == std::numeric_limits<time_t>::max() ? 0 : remove_time));
}
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
columns_[i++]->insert(static_cast<UInt64>(part.use_count() - 1));
if (columns_mask[src_index++])
columns[res_index++]->insert(static_cast<UInt64>(part.use_count() - 1));
columns_[i++]->insert(part->getMinDate());
columns_[i++]->insert(part->getMaxDate());
columns_[i++]->insert(static_cast<UInt32>(part->getMinTime()));
columns_[i++]->insert(static_cast<UInt32>(part->getMaxTime()));
columns_[i++]->insert(part->info.partition_id);
columns_[i++]->insert(part->info.min_block);
columns_[i++]->insert(part->info.max_block);
columns_[i++]->insert(part->info.level);
columns_[i++]->insert(static_cast<UInt64>(part->info.getDataVersion()));
columns_[i++]->insert(part->getIndexSizeInBytes());
columns_[i++]->insert(part->getIndexSizeInAllocatedBytes());
columns_[i++]->insert(part->is_frozen.load(std::memory_order_relaxed));
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getMinDate());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getMaxDate());
if (columns_mask[src_index++])
columns[res_index++]->insert(static_cast<UInt32>(part->getMinTime()));
if (columns_mask[src_index++])
columns[res_index++]->insert(static_cast<UInt32>(part->getMaxTime()));
if (columns_mask[src_index++])
columns[res_index++]->insert(part->info.partition_id);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->info.min_block);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->info.max_block);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->info.level);
if (columns_mask[src_index++])
columns[res_index++]->insert(static_cast<UInt64>(part->info.getDataVersion()));
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getIndexSizeInBytes());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getIndexSizeInAllocatedBytes());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->is_frozen.load(std::memory_order_relaxed));
if (columns_mask[src_index++])
columns[res_index++]->insert(info.database);
if (columns_mask[src_index++])
columns[res_index++]->insert(info.table);
if (columns_mask[src_index++])
columns[res_index++]->insert(info.engine);
columns_[i++]->insert(info.database);
columns_[i++]->insert(info.table);
columns_[i++]->insert(info.engine);
if (part->isStoredOnDisk())
{
columns_[i++]->insert(part->volume->getDisk()->getName());
columns_[i++]->insert(part->getFullPath());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->volume->getDisk()->getName());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getFullPath());
}
else
{
columns_[i++]->insertDefault();
columns_[i++]->insertDefault();
if (columns_mask[src_index++])
columns[res_index++]->insertDefault();
if (columns_mask[src_index++])
columns[res_index++]->insertDefault();
}
MinimalisticDataPartChecksums helper;
helper.computeTotalChecksums(part->checksums);
auto checksum = helper.hash_of_all_files;
columns_[i++]->insert(getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second));
{
MinimalisticDataPartChecksums helper;
if (columns_mask[src_index] || columns_mask[src_index + 1] || columns_mask[src_index + 2])
helper.computeTotalChecksums(part->checksums);
checksum = helper.hash_of_uncompressed_files;
columns_[i++]->insert(getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second));
checksum = helper.uncompressed_hash_of_compressed_files;
columns_[i++]->insert(getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second));
if (columns_mask[src_index++])
{
auto checksum = helper.hash_of_all_files;
columns[res_index++]->insert(getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second));
}
if (columns_mask[src_index++])
{
auto checksum = helper.hash_of_uncompressed_files;
columns[res_index++]->insert(getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second));
}
if (columns_mask[src_index++])
{
auto checksum = helper.uncompressed_hash_of_compressed_files;
columns[res_index++]->insert(getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second));
}
}
/// delete_ttl_info
{
columns_[i++]->insert(static_cast<UInt32>(part->ttl_infos.table_ttl.min));
columns_[i++]->insert(static_cast<UInt32>(part->ttl_infos.table_ttl.max));
}
if (columns_mask[src_index++])
columns[res_index++]->insert(static_cast<UInt32>(part->ttl_infos.table_ttl.min));
if (columns_mask[src_index++])
columns[res_index++]->insert(static_cast<UInt32>(part->ttl_infos.table_ttl.max));
auto add_ttl_info_map = [&](const TTLInfoMap & ttl_info_map)
{
Array expression_array;
Array min_array;
Array max_array;
expression_array.reserve(ttl_info_map.size());
min_array.reserve(ttl_info_map.size());
max_array.reserve(ttl_info_map.size());
if (columns_mask[src_index])
expression_array.reserve(ttl_info_map.size());
if (columns_mask[src_index + 1])
min_array.reserve(ttl_info_map.size());
if (columns_mask[src_index + 2])
max_array.reserve(ttl_info_map.size());
for (const auto & [expression, ttl_info] : ttl_info_map)
{
expression_array.emplace_back(expression);
min_array.push_back(static_cast<UInt32>(ttl_info.min));
max_array.push_back(static_cast<UInt32>(ttl_info.max));
if (columns_mask[src_index])
expression_array.emplace_back(expression);
if (columns_mask[src_index + 1])
min_array.push_back(static_cast<UInt32>(ttl_info.min));
if (columns_mask[src_index + 2])
max_array.push_back(static_cast<UInt32>(ttl_info.max));
}
columns_[i++]->insert(expression_array);
columns_[i++]->insert(min_array);
columns_[i++]->insert(max_array);
if (columns_mask[src_index++])
columns[res_index++]->insert(expression_array);
if (columns_mask[src_index++])
columns[res_index++]->insert(min_array);
if (columns_mask[src_index++])
columns[res_index++]->insert(max_array);
};
add_ttl_info_map(part->ttl_infos.moves_ttl);
columns_[i++]->insert(queryToString(part->default_codec->getCodecDesc()));
if (columns_mask[src_index++])
columns[res_index++]->insert(queryToString(part->default_codec->getCodecDesc()));
add_ttl_info_map(part->ttl_infos.recompression_ttl);
add_ttl_info_map(part->ttl_infos.group_by_ttl);
@ -195,7 +253,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto
/// _state column should be the latest.
/// Do not use part->getState*, it can be changed from different thread
if (has_state_column)
columns_[i++]->insert(IMergeTreeDataPart::stateToString(part_state));
columns[res_index++]->insert(IMergeTreeDataPart::stateToString(part_state));
}
}

View File

@ -20,7 +20,8 @@ public:
protected:
explicit StorageSystemParts(const StorageID & table_id_);
void processNextStorage(MutableColumns & columns, const StoragesInfo & info, bool has_state_column) override;
void processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) override;
};
}

View File

@ -245,16 +245,29 @@ Pipe StorageSystemPartsBase::read(
/// Create the result.
MutableColumns res_columns = metadata_snapshot->getSampleBlock().cloneEmptyColumns();
NameSet names_set(column_names.begin(), column_names.end());
Block sample = metadata_snapshot->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample.columns());
for (size_t i = 0; i < sample.columns(); ++i)
{
if (names_set.count(sample.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample.getByPosition(i));
}
}
MutableColumns res_columns = header.cloneEmptyColumns();
if (has_state_column)
res_columns.push_back(ColumnString::create());
while (StoragesInfo info = stream.next())
{
processNextStorage(res_columns, info, has_state_column);
processNextStorage(res_columns, columns_mask, info, has_state_column);
}
Block header = metadata_snapshot->getSampleBlock();
if (has_state_column)
header.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));

View File

@ -74,7 +74,9 @@ protected:
StorageSystemPartsBase(const StorageID & table_id_, NamesAndTypesList && columns_);
virtual void processNextStorage(MutableColumns & columns, const StoragesInfo & info, bool has_state_column) = 0;
virtual void
processNextStorage(MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
= 0;
};
}

View File

@ -60,7 +60,8 @@ StorageSystemPartsColumns::StorageSystemPartsColumns(const StorageID & table_id_
{
}
void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns_, const StoragesInfo & info, bool has_state_column)
void StorageSystemPartsColumns::processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
{
/// Prepare information about columns in storage.
struct ColumnInfo
@ -105,67 +106,105 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns_, co
for (const auto & column : part->getColumns())
{
++column_position;
size_t j = 0;
size_t src_index = 0, res_index = 0;
if (columns_mask[src_index++])
{
WriteBufferFromOwnString out;
part->partition.serializeText(*info.data, out, format_settings);
columns_[j++]->insert(out.str());
columns[res_index++]->insert(out.str());
}
columns_[j++]->insert(part->name);
columns_[j++]->insert(part->getTypeName());
columns_[j++]->insert(part_state == State::Committed);
columns_[j++]->insert(part->getMarksCount());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->name);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getTypeName());
if (columns_mask[src_index++])
columns[res_index++]->insert(part_state == State::Committed);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getMarksCount());
columns_[j++]->insert(part->rows_count);
columns_[j++]->insert(part->getBytesOnDisk());
columns_[j++]->insert(columns_size.data_compressed);
columns_[j++]->insert(columns_size.data_uncompressed);
columns_[j++]->insert(columns_size.marks);
columns_[j++]->insert(UInt64(part->modification_time));
columns_[j++]->insert(UInt64(part->remove_time.load(std::memory_order_relaxed)));
if (columns_mask[src_index++])
columns[res_index++]->insert(part->rows_count);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getBytesOnDisk());
if (columns_mask[src_index++])
columns[res_index++]->insert(columns_size.data_compressed);
if (columns_mask[src_index++])
columns[res_index++]->insert(columns_size.data_uncompressed);
if (columns_mask[src_index++])
columns[res_index++]->insert(columns_size.marks);
if (columns_mask[src_index++])
columns[res_index++]->insert(UInt64(part->modification_time));
if (columns_mask[src_index++])
columns[res_index++]->insert(UInt64(part->remove_time.load(std::memory_order_relaxed)));
columns_[j++]->insert(UInt64(use_count));
if (columns_mask[src_index++])
columns[res_index++]->insert(UInt64(use_count));
columns_[j++]->insert(min_date);
columns_[j++]->insert(max_date);
columns_[j++]->insert(part->info.partition_id);
columns_[j++]->insert(part->info.min_block);
columns_[j++]->insert(part->info.max_block);
columns_[j++]->insert(part->info.level);
columns_[j++]->insert(UInt64(part->info.getDataVersion()));
columns_[j++]->insert(index_size_in_bytes);
columns_[j++]->insert(index_size_in_allocated_bytes);
if (columns_mask[src_index++])
columns[res_index++]->insert(min_date);
if (columns_mask[src_index++])
columns[res_index++]->insert(max_date);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->info.partition_id);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->info.min_block);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->info.max_block);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->info.level);
if (columns_mask[src_index++])
columns[res_index++]->insert(UInt64(part->info.getDataVersion()));
if (columns_mask[src_index++])
columns[res_index++]->insert(index_size_in_bytes);
if (columns_mask[src_index++])
columns[res_index++]->insert(index_size_in_allocated_bytes);
columns_[j++]->insert(info.database);
columns_[j++]->insert(info.table);
columns_[j++]->insert(info.engine);
columns_[j++]->insert(part->volume->getDisk()->getName());
columns_[j++]->insert(part->getFullPath());
if (columns_mask[src_index++])
columns[res_index++]->insert(info.database);
if (columns_mask[src_index++])
columns[res_index++]->insert(info.table);
if (columns_mask[src_index++])
columns[res_index++]->insert(info.engine);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->volume->getDisk()->getName());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->getFullPath());
columns_[j++]->insert(column.name);
columns_[j++]->insert(column.type->getName());
columns_[j++]->insert(column_position);
if (columns_mask[src_index++])
columns[res_index++]->insert(column.name);
if (columns_mask[src_index++])
columns[res_index++]->insert(column.type->getName());
if (columns_mask[src_index++])
columns[res_index++]->insert(column_position);
auto column_info_it = columns_info.find(column.name);
if (column_info_it != columns_info.end())
{
columns_[j++]->insert(column_info_it->second.default_kind);
columns_[j++]->insert(column_info_it->second.default_expression);
if (columns_mask[src_index++])
columns[res_index++]->insert(column_info_it->second.default_kind);
if (columns_mask[src_index++])
columns[res_index++]->insert(column_info_it->second.default_expression);
}
else
{
columns_[j++]->insertDefault();
columns_[j++]->insertDefault();
if (columns_mask[src_index++])
columns[res_index++]->insertDefault();
if (columns_mask[src_index++])
columns[res_index++]->insertDefault();
}
ColumnSize column_size = part->getColumnSize(column.name, *column.type);
columns_[j++]->insert(column_size.data_compressed + column_size.marks);
columns_[j++]->insert(column_size.data_compressed);
columns_[j++]->insert(column_size.data_uncompressed);
columns_[j++]->insert(column_size.marks);
if (columns_mask[src_index++])
columns[res_index++]->insert(column_size.data_compressed + column_size.marks);
if (columns_mask[src_index++])
columns[res_index++]->insert(column_size.data_compressed);
if (columns_mask[src_index++])
columns[res_index++]->insert(column_size.data_uncompressed);
if (columns_mask[src_index++])
columns[res_index++]->insert(column_size.marks);
if (has_state_column)
columns_[j++]->insert(part->stateString());
columns[res_index++]->insert(part->stateString());
}
}
}

View File

@ -22,7 +22,8 @@ public:
protected:
StorageSystemPartsColumns(const StorageID & table_id_);
void processNextStorage(MutableColumns & columns, const StoragesInfo & info, bool has_state_column) override;
void processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) override;
};
}