Merge pull request #6028 from amosbird/c1

Optimize count()
This commit is contained in:
alexey-milovidov 2019-07-17 13:08:16 +03:00 committed by GitHub
commit 07fcbbcdc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 79 additions and 32 deletions

View File

@ -1040,7 +1040,28 @@ void ExpressionAnalyzer::collectUsedColumns()
/// You need to read at least one column to find the number of rows.
if (select_query && required.empty())
required.insert(ExpressionActions::getSmallestColumn(source_columns));
{
/// We will find a column with minimum compressed size. Because it is the column that is cheapest to read.
size_t min_data_compressed = 0;
String min_column_name;
if (storage)
{
auto column_sizes = storage->getColumnSizes();
for (auto & [column_name, column_size] : column_sizes)
{
if (min_data_compressed == 0 || min_data_compressed > column_size.data_compressed)
{
min_data_compressed = column_size.data_compressed;
min_column_name = column_name;
}
}
}
if (min_data_compressed > 0)
required.insert(min_column_name);
else
/// If we have no information about columns sizes, choose a column of minimum size of its data type.
required.insert(ExpressionActions::getSmallestColumn(source_columns));
}
NameSet unknown_required_source_columns = required;

View File

@ -38,6 +38,19 @@ class AlterCommands;
class MutationCommands;
class PartitionCommands;
struct ColumnSize
{
size_t marks = 0;
size_t data_compressed = 0;
size_t data_uncompressed = 0;
void add(const ColumnSize & other)
{
marks += other.marks;
data_compressed += other.data_compressed;
data_uncompressed += other.data_uncompressed;
}
};
/** Storage. Describes the table. Responsible for
* - storage of the table data;
@ -82,6 +95,10 @@ public:
/// Returns true if the storage supports deduplication of inserted data blocks.
virtual bool supportsDeduplication() const { return false; }
/// Optional size information of each physical column.
/// Currently it's only used by the MergeTree family for query optimizations.
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
virtual ColumnSizeByName getColumnSizes() const { return {}; }
public: /// thread-unsafe part. lockStructure must be acquired
const ColumnsDescription & getColumns() const; /// returns combined set of columns

View File

@ -119,7 +119,7 @@ void MergeTreeBlockSizePredictor::initialize(const Block & sample_block, const N
ColumnInfo info;
info.name = column_name;
/// If column isn't fixed and doesn't have checksum, than take first
MergeTreeDataPart::ColumnSize column_size = data_part->getColumnSize(
ColumnSize column_size = data_part->getColumnSize(
column_name, *column_with_type_and_name.type);
info.bytes_per_row_global = column_size.data_uncompressed

View File

@ -2378,8 +2378,8 @@ void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
for (const auto & column : part->columns)
{
DataPart::ColumnSize & total_column_size = column_sizes[column.name];
DataPart::ColumnSize part_column_size = part->getColumnSize(column.name, *column.type);
ColumnSize & total_column_size = column_sizes[column.name];
ColumnSize part_column_size = part->getColumnSize(column.name, *column.type);
total_column_size.add(part_column_size);
}
}
@ -2390,8 +2390,8 @@ void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part
for (const auto & column : part->columns)
{
DataPart::ColumnSize & total_column_size = column_sizes[column.name];
DataPart::ColumnSize part_column_size = part->getColumnSize(column.name, *column.type);
ColumnSize & total_column_size = column_sizes[column.name];
ColumnSize part_column_size = part->getColumnSize(column.name, *column.type);
auto log_subtract = [&](size_t & from, size_t value, const char * field)
{

View File

@ -547,8 +547,7 @@ public:
return it == std::end(column_sizes) ? 0 : it->second.data_compressed;
}
using ColumnSizeByName = std::unordered_map<std::string, DataPart::ColumnSize>;
ColumnSizeByName getColumnSizes() const
ColumnSizeByName getColumnSizes() const override
{
auto lock = lockParts();
return column_sizes;

View File

@ -153,7 +153,7 @@ MergeTreeDataPart::MergeTreeDataPart(const MergeTreeData & storage_, const Strin
/// Takes into account the fact that several columns can e.g. share their .size substreams.
/// When calculating totals these should be counted only once.
MergeTreeDataPart::ColumnSize MergeTreeDataPart::getColumnSizeImpl(
ColumnSize MergeTreeDataPart::getColumnSizeImpl(
const String & column_name, const IDataType & type, std::unordered_set<String> * processed_substreams) const
{
ColumnSize size;
@ -182,12 +182,12 @@ MergeTreeDataPart::ColumnSize MergeTreeDataPart::getColumnSizeImpl(
return size;
}
MergeTreeDataPart::ColumnSize MergeTreeDataPart::getColumnSize(const String & column_name, const IDataType & type) const
ColumnSize MergeTreeDataPart::getColumnSize(const String & column_name, const IDataType & type) const
{
return getColumnSizeImpl(column_name, type, nullptr);
}
MergeTreeDataPart::ColumnSize MergeTreeDataPart::getTotalColumnsSize() const
ColumnSize MergeTreeDataPart::getTotalColumnsSize() const
{
ColumnSize totals;
std::unordered_set<String> processed_substreams;

View File

@ -22,6 +22,7 @@
namespace DB
{
struct ColumnSize;
class MergeTreeData;
@ -39,20 +40,6 @@ struct MergeTreeDataPart
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const;
struct ColumnSize
{
size_t marks = 0;
size_t data_compressed = 0;
size_t data_uncompressed = 0;
void add(const ColumnSize & other)
{
marks += other.marks;
data_compressed += other.data_compressed;
data_uncompressed += other.data_uncompressed;
}
};
/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (either by locking columns_lock, or by locking table structure).

View File

@ -121,11 +121,7 @@ protected:
cols_required_for_primary_key = storage->getColumnsRequiredForPrimaryKey();
cols_required_for_sampling = storage->getColumnsRequiredForSampling();
/** Info about sizes of columns for tables of MergeTree family.
* NOTE: It is possible to add getter for this info to IStorage interface.
*/
if (auto storage_concrete = dynamic_cast<const MergeTreeData *>(storage.get()))
column_sizes = storage_concrete->getColumnSizes();
column_sizes = storage->getColumnSizes();
}
for (const auto & column : columns)

View File

@ -68,7 +68,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns, const Stor
const auto & part = all_parts[part_number];
auto part_state = all_parts_state[part_number];
MergeTreeDataPart::ColumnSize columns_size = part->getTotalColumnsSize();
ColumnSize columns_size = part->getTotalColumnsSize();
size_t i = 0;
{

View File

@ -151,7 +151,7 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con
columns[j++]->insertDefault();
}
MergeTreeDataPart::ColumnSize column_size = part->getColumnSize(column.name, *column.type);
ColumnSize column_size = part->getColumnSize(column.name, *column.type);
columns[j++]->insert(column_size.data_compressed + column_size.marks);
columns[j++]->insert(column_size.data_compressed);
columns[j++]->insert(column_size.data_uncompressed);

View File

@ -0,0 +1,27 @@
<test>
<name>count</name>
<type>loop</type>
<stop_conditions>
<all_of>
<total_time_ms>30000</total_time_ms>
</all_of>
<any_of>
<average_speed_not_changing_for_ms>6000</average_speed_not_changing_for_ms>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<total_time />
</main_metric>
<create_query>CREATE TABLE data(k UInt64, v UInt64) ENGINE = MergeTree ORDER BY k</create_query>
<fill_query>INSERT INTO data SELECT number, 1 from numbers(10000000)</fill_query>
<query tag='count_10M'>SELECT count() FROM data</query>
<drop_query>DROP TABLE IF EXISTS data</drop_query>
</test>