diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 9361b909590..fb3021275be 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -380,6 +380,7 @@ struct Settings : public SettingsCollection M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \ \ M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.") \ + M(SettingBool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.") \ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index dc7331f7031..22d17ca8a52 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -65,6 +66,7 @@ #include #include #include +#include #include #include @@ -90,6 +92,7 @@ #include #include #include +#include namespace DB @@ -1273,6 +1276,65 @@ void InterpreterSelectQuery::executeFetchColumns( auto & query = getSelectQuery(); const Settings & settings = context.getSettingsRef(); + /// Optimization for trivial query like SELECT count() FROM table. + auto check_trivial_count_query = [&]() -> std::optional + { + if (!settings.optimize_trivial_count_query || !syntax_analyzer_result->maybe_optimize_trivial_count || !storage + || query.sample_size() || query.sample_offset() || query.final() || query.prewhere() || query.where() + || !query_analyzer->hasAggregation() || processing_stage != QueryProcessingStage::FetchColumns) + return {}; + + Names key_names; + AggregateDescriptions aggregates; + query_analyzer->getAggregateInfo(key_names, aggregates); + + if (aggregates.size() != 1) + return {}; + + const AggregateDescription & desc = aggregates[0]; + if (typeid_cast(desc.function.get())) + return desc; + + return {}; + }; + + if (auto desc = check_trivial_count_query()) + { + auto func = desc->function; + std::optional num_rows = storage->totalRows(); + if (num_rows) + { + AggregateFunctionCount & agg_count = static_cast(*func); + + /// We will process it up to "WithMergeableState". + std::vector state(agg_count.sizeOfData()); + AggregateDataPtr place = state.data(); + + agg_count.create(place); + SCOPE_EXIT(agg_count.destroy(place)); + + MemoryWriteBuffer out; + writeVarUInt(*num_rows, out); + auto in = out.tryGetReadBuffer(); + agg_count.deserialize(place, *in, nullptr); + + auto column = ColumnAggregateFunction::create(func); + column->insertFrom(place); + + Block block_with_count{ + {std::move(column), std::make_shared(func, DataTypes(), Array()), desc->column_name}}; + + auto istream = std::make_shared(block_with_count); + if constexpr (pipeline_with_processors) + pipeline.init({std::make_shared(istream)}); + else + pipeline.streams.emplace_back(istream); + from_stage = QueryProcessingStage::WithMergeableState; + analysis_result.first_stage = false; + return; + } + } + /// Actions to calculate ALIAS if required. ExpressionActionsPtr alias_actions; diff --git a/dbms/src/Interpreters/SyntaxAnalyzer.cpp b/dbms/src/Interpreters/SyntaxAnalyzer.cpp index 67a1b3ea7db..935ac67808a 100644 --- a/dbms/src/Interpreters/SyntaxAnalyzer.cpp +++ b/dbms/src/Interpreters/SyntaxAnalyzer.cpp @@ -727,6 +727,7 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA /// You need to read at least one column to find the number of rows. if (select_query && required.empty()) { + maybe_optimize_trivial_count = true; /// We will find a column with minimum . /// Because it is the column that is cheapest to read. struct ColumnSizeTuple diff --git a/dbms/src/Interpreters/SyntaxAnalyzer.h b/dbms/src/Interpreters/SyntaxAnalyzer.h index 96f5678ac6f..0f1709df2fc 100644 --- a/dbms/src/Interpreters/SyntaxAnalyzer.h +++ b/dbms/src/Interpreters/SyntaxAnalyzer.h @@ -48,6 +48,8 @@ struct SyntaxAnalyzerResult /// Results of scalar sub queries Scalars scalars; + bool maybe_optimize_trivial_count = false; + void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns); Names requiredSourceColumns() const { return required_source_columns.getNames(); } const Scalars & getScalars() const { return scalars; } diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index b4d6b2c3085..74126bc123b 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -406,6 +406,13 @@ public: /// Returns storage policy if storage supports it virtual DiskSpace::StoragePolicyPtr getStoragePolicy() const { return {}; } + /** If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it. + */ + virtual std::optional totalRows() const + { + return {}; + } + private: /// You always need to take the next three locks in this order. diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index c2475395101..b7408634e3f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -2374,6 +2374,20 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const } +size_t MergeTreeData::getTotalActiveSizeInRows() const +{ + size_t res = 0; + { + auto lock = lockParts(); + + for (auto & part : getDataPartsStateRange(DataPartState::Committed)) + res += part->rows_count; + } + + return res; +} + + size_t MergeTreeData::getPartsCount() const { auto lock = lockParts(); @@ -2486,7 +2500,7 @@ void MergeTreeData::throwInsertIfNeeded() const } MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart( - const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) + const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const { auto current_state_parts_range = getDataPartsStateRange(state); @@ -2534,13 +2548,13 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy) } -MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info) +MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info) const { auto lock = lockParts(); return getActiveContainingPart(part_info, DataPartState::Committed, lock); } -MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name) +MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name) const { auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); return getActiveContainingPart(part_info); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index a8bd661fafa..64ed0199522 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -435,9 +435,9 @@ public: DataPartsVector getDataPartsVector() const; /// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr. - DataPartPtr getActiveContainingPart(const String & part_name); - DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info); - DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock); + DataPartPtr getActiveContainingPart(const String & part_name) const; + DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const; + DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const; /// Swap part with it's identical copy (possible with another path on another disk). /// If original part is not active or doesn't exist exception will be thrown. @@ -453,6 +453,8 @@ public: /// Total size of active parts in bytes. size_t getTotalActiveSizeInBytes() const; + size_t getTotalActiveSizeInRows() const; + size_t getPartsCount() const; size_t getMaxPartsCountForPartition() const; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index c752109e328..fce5f038f8c 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -134,6 +134,11 @@ BlockInputStreams StorageMergeTree::read( return reader.read(column_names, query_info, context, max_block_size, num_streams); } +std::optional StorageMergeTree::totalRows() const +{ + return getTotalActiveSizeInRows(); +} + BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context) { return std::make_shared(*this, context.getSettingsRef().max_partitions_per_insert_block); diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 15080cfcbf8..a6cc2800c9e 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -45,6 +45,8 @@ public: size_t max_block_size, unsigned num_streams) override; + std::optional totalRows() const override; + BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; /** Perform the next step in combining the parts. diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index c55378d8526..fc6e8ab2676 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -171,13 +171,13 @@ void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper) current_zookeeper = zookeeper; } -zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() +zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const { std::lock_guard lock(current_zookeeper_mutex); return current_zookeeper; } -zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() +zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const { auto res = tryGetZooKeeper(); if (!res) @@ -2920,6 +2920,58 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree() } +ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMergeTree::getMaxAddedBlocks() const +{ + ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks; + + for (const auto & data_part : getDataParts()) + { + max_added_blocks[data_part->info.partition_id] + = std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block); + } + + auto zookeeper = getZooKeeper(); + + const String quorum_status_path = zookeeper_path + "/quorum/status"; + + String value; + Coordination::Stat stat; + + if (zookeeper->tryGet(quorum_status_path, value, &stat)) + { + ReplicatedMergeTreeQuorumEntry quorum_entry; + quorum_entry.fromString(value); + + auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name, format_version); + + max_added_blocks[part_info.partition_id] = part_info.max_block - 1; + } + + String added_parts_str; + if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", added_parts_str)) + { + if (!added_parts_str.empty()) + { + ReplicatedMergeTreeQuorumAddedParts part_with_quorum(format_version); + part_with_quorum.fromString(added_parts_str); + + auto added_parts = part_with_quorum.added_parts; + + for (const auto & added_part : added_parts) + if (!getActiveContainingPart(added_part.second)) + throw Exception( + "Replica doesn't have part " + added_part.second + + " which was successfully written to quorum of other replicas." + " Send query to another replica or disable 'select_sequential_consistency' setting.", + ErrorCodes::REPLICA_IS_NOT_IN_QUORUM); + + for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks()) + max_added_blocks[max_block.first] = max_block.second; + } + } + return max_added_blocks; +} + BlockInputStreams StorageReplicatedMergeTree::read( const Names & column_names, const SelectQueryInfo & query_info, @@ -2937,50 +2989,7 @@ BlockInputStreams StorageReplicatedMergeTree::read( */ if (settings_.select_sequential_consistency) { - ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks; - - for (const auto & data_part : getDataParts()) - { - max_added_blocks[data_part->info.partition_id] = std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block); - } - - auto zookeeper = getZooKeeper(); - - const String quorum_status_path = zookeeper_path + "/quorum/status"; - - String value; - Coordination::Stat stat; - - if (zookeeper->tryGet(quorum_status_path, value, &stat)) - { - ReplicatedMergeTreeQuorumEntry quorum_entry; - quorum_entry.fromString(value); - - auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name, format_version); - - max_added_blocks[part_info.partition_id] = part_info.max_block - 1; - } - - String added_parts_str; - if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", added_parts_str)) - { - if (!added_parts_str.empty()) - { - ReplicatedMergeTreeQuorumAddedParts part_with_quorum(format_version); - part_with_quorum.fromString(added_parts_str); - - auto added_parts = part_with_quorum.added_parts; - - for (const auto & added_part : added_parts) - if (!getActiveContainingPart(added_part.second)) - throw Exception("Replica doesn't have part " + added_part.second + " which was successfully written to quorum of other replicas." - " Send query to another replica or disable 'select_sequential_consistency' setting.", ErrorCodes::REPLICA_IS_NOT_IN_QUORUM); - - for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks()) - max_added_blocks[max_block.first] = max_block.second; - } - } - + auto max_added_blocks = getMaxAddedBlocks(); return reader.read(column_names, query_info, context, max_block_size, num_streams, &max_added_blocks); } @@ -2988,6 +2997,26 @@ BlockInputStreams StorageReplicatedMergeTree::read( } +std::optional StorageReplicatedMergeTree::totalRows() const +{ + size_t res = 0; + auto max_added_blocks = getMaxAddedBlocks(); + auto lock = lockParts(); + for (auto & part : getDataPartsStateRange(DataPartState::Committed)) + { + if (part->isEmpty()) + continue; + + auto blocks_iterator = max_added_blocks.find(part->info.partition_id); + if (blocks_iterator == max_added_blocks.end() || part->info.max_block > blocks_iterator->second) + continue; + + res += part->rows_count; + } + return res; +} + + void StorageReplicatedMergeTree::assertNotReadonly() const { if (is_readonly) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index facdb1660f0..5eebd030dc8 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -96,6 +96,8 @@ public: size_t max_block_size, unsigned num_streams) override; + std::optional totalRows() const override; + BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override; @@ -174,6 +176,10 @@ public: bool canUseAdaptiveGranularity() const override; private: + + /// Get a sequential consistent view of current parts. + ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const; + /// Delete old parts from disk and from ZooKeeper. void clearOldPartsAndRemoveFromZK(); @@ -191,10 +197,10 @@ private: using LogEntryPtr = LogEntry::Ptr; zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below. - std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread. + mutable std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread. - zkutil::ZooKeeperPtr tryGetZooKeeper(); - zkutil::ZooKeeperPtr getZooKeeper(); + zkutil::ZooKeeperPtr tryGetZooKeeper() const; + zkutil::ZooKeeperPtr getZooKeeper() const; void setZooKeeper(zkutil::ZooKeeperPtr zookeeper); /// If true, the table is offline and can not be written to it. diff --git a/dbms/tests/queries/0_stateless/00971_merge_tree_uniform_read_distribution_and_max_rows_to_read.sql b/dbms/tests/queries/0_stateless/00971_merge_tree_uniform_read_distribution_and_max_rows_to_read.sql index 37d09a3d3dd..b3d9612b39f 100644 --- a/dbms/tests/queries/0_stateless/00971_merge_tree_uniform_read_distribution_and_max_rows_to_read.sql +++ b/dbms/tests/queries/0_stateless/00971_merge_tree_uniform_read_distribution_and_max_rows_to_read.sql @@ -14,9 +14,9 @@ SELECT count() FROM merge_tree; SET max_rows_to_read = 900000; SET merge_tree_uniform_read_distribution = 1; -SELECT count() FROM merge_tree; -- { serverError 158 } +SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 } SET merge_tree_uniform_read_distribution = 0; -SELECT count() FROM merge_tree; -- { serverError 158 } +SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 } DROP TABLE merge_tree;