Merge pull request #7510 from amosbird/smartcount

Instant count() for MergeTree
This commit is contained in:
alexey-milovidov 2019-10-30 20:42:00 +03:00 committed by GitHub
commit ddb58eeb8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 231 additions and 57 deletions

View File

@ -380,6 +380,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \
\
M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.") \
M(SettingBool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.") \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -26,6 +26,7 @@
#include <DataStreams/ReverseBlockInputStream.h>
#include <DataStreams/FillingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -65,6 +66,7 @@
#include <Common/checkStackSize.h>
#include <Parsers/queryToString.h>
#include <ext/map.h>
#include <ext/scope_guard.h>
#include <memory>
#include <Processors/Sources/NullSource.h>
@ -90,6 +92,7 @@
#include <Processors/Transforms/FinishSortingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataStreams/materializeBlock.h>
#include <IO/MemoryReadWriteBuffer.h>
namespace DB
@ -1273,6 +1276,65 @@ void InterpreterSelectQuery::executeFetchColumns(
auto & query = getSelectQuery();
const Settings & settings = context.getSettingsRef();
/// Optimization for trivial query like SELECT count() FROM table.
auto check_trivial_count_query = [&]() -> std::optional<AggregateDescription>
{
if (!settings.optimize_trivial_count_query || !syntax_analyzer_result->maybe_optimize_trivial_count || !storage
|| query.sample_size() || query.sample_offset() || query.final() || query.prewhere() || query.where()
|| !query_analyzer->hasAggregation() || processing_stage != QueryProcessingStage::FetchColumns)
return {};
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
if (aggregates.size() != 1)
return {};
const AggregateDescription & desc = aggregates[0];
if (typeid_cast<AggregateFunctionCount *>(desc.function.get()))
return desc;
return {};
};
if (auto desc = check_trivial_count_query())
{
auto func = desc->function;
std::optional<UInt64> num_rows = storage->totalRows();
if (num_rows)
{
AggregateFunctionCount & agg_count = static_cast<AggregateFunctionCount &>(*func);
/// We will process it up to "WithMergeableState".
std::vector<char> state(agg_count.sizeOfData());
AggregateDataPtr place = state.data();
agg_count.create(place);
SCOPE_EXIT(agg_count.destroy(place));
MemoryWriteBuffer out;
writeVarUInt(*num_rows, out);
auto in = out.tryGetReadBuffer();
agg_count.deserialize(place, *in, nullptr);
auto column = ColumnAggregateFunction::create(func);
column->insertFrom(place);
Block block_with_count{
{std::move(column), std::make_shared<DataTypeAggregateFunction>(func, DataTypes(), Array()), desc->column_name}};
auto istream = std::make_shared<OneBlockInputStream>(block_with_count);
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<SourceFromInputStream>(istream)});
else
pipeline.streams.emplace_back(istream);
from_stage = QueryProcessingStage::WithMergeableState;
analysis_result.first_stage = false;
return;
}
}
/// Actions to calculate ALIAS if required.
ExpressionActionsPtr alias_actions;

View File

@ -727,6 +727,7 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA
/// You need to read at least one column to find the number of rows.
if (select_query && required.empty())
{
maybe_optimize_trivial_count = true;
/// We will find a column with minimum <compressed_size, type_size, uncompressed_size>.
/// Because it is the column that is cheapest to read.
struct ColumnSizeTuple

View File

@ -48,6 +48,8 @@ struct SyntaxAnalyzerResult
/// Results of scalar sub queries
Scalars scalars;
bool maybe_optimize_trivial_count = false;
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns);
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
const Scalars & getScalars() const { return scalars; }

View File

@ -406,6 +406,13 @@ public:
/// Returns storage policy if storage supports it
virtual DiskSpace::StoragePolicyPtr getStoragePolicy() const { return {}; }
/** If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
*/
virtual std::optional<UInt64> totalRows() const
{
return {};
}
private:
/// You always need to take the next three locks in this order.

View File

@ -2374,6 +2374,20 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const
}
size_t MergeTreeData::getTotalActiveSizeInRows() const
{
size_t res = 0;
{
auto lock = lockParts();
for (auto & part : getDataPartsStateRange(DataPartState::Committed))
res += part->rows_count;
}
return res;
}
size_t MergeTreeData::getPartsCount() const
{
auto lock = lockParts();
@ -2486,7 +2500,7 @@ void MergeTreeData::throwInsertIfNeeded() const
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/)
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const
{
auto current_state_parts_range = getDataPartsStateRange(state);
@ -2534,13 +2548,13 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info)
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info) const
{
auto lock = lockParts();
return getActiveContainingPart(part_info, DataPartState::Committed, lock);
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name) const
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
return getActiveContainingPart(part_info);

View File

@ -435,9 +435,9 @@ public:
DataPartsVector getDataPartsVector() const;
/// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name);
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info);
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock);
DataPartPtr getActiveContainingPart(const String & part_name) const;
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const;
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const;
/// Swap part with it's identical copy (possible with another path on another disk).
/// If original part is not active or doesn't exist exception will be thrown.
@ -453,6 +453,8 @@ public:
/// Total size of active parts in bytes.
size_t getTotalActiveSizeInBytes() const;
size_t getTotalActiveSizeInRows() const;
size_t getPartsCount() const;
size_t getMaxPartsCountForPartition() const;

View File

@ -134,6 +134,11 @@ BlockInputStreams StorageMergeTree::read(
return reader.read(column_names, query_info, context, max_block_size, num_streams);
}
std::optional<UInt64> StorageMergeTree::totalRows() const
{
return getTotalActiveSizeInRows();
}
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context)
{
return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block);

View File

@ -45,6 +45,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
std::optional<UInt64> totalRows() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
/** Perform the next step in combining the parts.

View File

@ -171,13 +171,13 @@ void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
current_zookeeper = zookeeper;
}
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper()
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const
{
std::lock_guard lock(current_zookeeper_mutex);
return current_zookeeper;
}
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper()
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const
{
auto res = tryGetZooKeeper();
if (!res)
@ -2920,6 +2920,58 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
}
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMergeTree::getMaxAddedBlocks() const
{
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks;
for (const auto & data_part : getDataParts())
{
max_added_blocks[data_part->info.partition_id]
= std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block);
}
auto zookeeper = getZooKeeper();
const String quorum_status_path = zookeeper_path + "/quorum/status";
String value;
Coordination::Stat stat;
if (zookeeper->tryGet(quorum_status_path, value, &stat))
{
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(value);
auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name, format_version);
max_added_blocks[part_info.partition_id] = part_info.max_block - 1;
}
String added_parts_str;
if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", added_parts_str))
{
if (!added_parts_str.empty())
{
ReplicatedMergeTreeQuorumAddedParts part_with_quorum(format_version);
part_with_quorum.fromString(added_parts_str);
auto added_parts = part_with_quorum.added_parts;
for (const auto & added_part : added_parts)
if (!getActiveContainingPart(added_part.second))
throw Exception(
"Replica doesn't have part " + added_part.second
+ " which was successfully written to quorum of other replicas."
" Send query to another replica or disable 'select_sequential_consistency' setting.",
ErrorCodes::REPLICA_IS_NOT_IN_QUORUM);
for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks())
max_added_blocks[max_block.first] = max_block.second;
}
}
return max_added_blocks;
}
BlockInputStreams StorageReplicatedMergeTree::read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -2937,50 +2989,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
*/
if (settings_.select_sequential_consistency)
{
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks;
for (const auto & data_part : getDataParts())
{
max_added_blocks[data_part->info.partition_id] = std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block);
}
auto zookeeper = getZooKeeper();
const String quorum_status_path = zookeeper_path + "/quorum/status";
String value;
Coordination::Stat stat;
if (zookeeper->tryGet(quorum_status_path, value, &stat))
{
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(value);
auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name, format_version);
max_added_blocks[part_info.partition_id] = part_info.max_block - 1;
}
String added_parts_str;
if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", added_parts_str))
{
if (!added_parts_str.empty())
{
ReplicatedMergeTreeQuorumAddedParts part_with_quorum(format_version);
part_with_quorum.fromString(added_parts_str);
auto added_parts = part_with_quorum.added_parts;
for (const auto & added_part : added_parts)
if (!getActiveContainingPart(added_part.second))
throw Exception("Replica doesn't have part " + added_part.second + " which was successfully written to quorum of other replicas."
" Send query to another replica or disable 'select_sequential_consistency' setting.", ErrorCodes::REPLICA_IS_NOT_IN_QUORUM);
for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks())
max_added_blocks[max_block.first] = max_block.second;
}
}
auto max_added_blocks = getMaxAddedBlocks();
return reader.read(column_names, query_info, context, max_block_size, num_streams, &max_added_blocks);
}
@ -2988,6 +2997,26 @@ BlockInputStreams StorageReplicatedMergeTree::read(
}
std::optional<UInt64> StorageReplicatedMergeTree::totalRows() const
{
size_t res = 0;
auto max_added_blocks = getMaxAddedBlocks();
auto lock = lockParts();
for (auto & part : getDataPartsStateRange(DataPartState::Committed))
{
if (part->isEmpty())
continue;
auto blocks_iterator = max_added_blocks.find(part->info.partition_id);
if (blocks_iterator == max_added_blocks.end() || part->info.max_block > blocks_iterator->second)
continue;
res += part->rows_count;
}
return res;
}
void StorageReplicatedMergeTree::assertNotReadonly() const
{
if (is_readonly)

View File

@ -96,6 +96,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
std::optional<UInt64> totalRows() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override;
@ -174,6 +176,10 @@ public:
bool canUseAdaptiveGranularity() const override;
private:
/// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
/// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK();
@ -191,10 +197,10 @@ private:
using LogEntryPtr = LogEntry::Ptr;
zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below.
std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread.
mutable std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread.
zkutil::ZooKeeperPtr tryGetZooKeeper();
zkutil::ZooKeeperPtr getZooKeeper();
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeper() const;
void setZooKeeper(zkutil::ZooKeeperPtr zookeeper);
/// If true, the table is offline and can not be written to it.

View File

@ -0,0 +1,14 @@
1234567
2469134
2469134
1234567
1234567
1234567
1234567
1234567
1234567
1234567
1234567
1234568
1234567

View File

@ -0,0 +1,18 @@
DROP TABLE IF EXISTS test.count;
CREATE TABLE test.count (x UInt64) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO test.count SELECT * FROM numbers(1234567);
SELECT count() FROM test.count;
SELECT count() * 2 FROM test.count;
SELECT count() FROM (SELECT * FROM test.count UNION ALL SELECT * FROM test.count);
SELECT count() FROM test.count WITH TOTALS;
SELECT arrayJoin([count(), count()]) FROM test.count;
SELECT arrayJoin([count(), count()]) FROM test.count LIMIT 1;
SELECT arrayJoin([count(), count()]) FROM test.count LIMIT 1, 1;
SELECT arrayJoin([count(), count()]) AS x FROM test.count LIMIT 1 BY x;
SELECT arrayJoin([count(), count() + 1]) AS x FROM test.count LIMIT 1 BY x;
SELECT count() FROM test.count HAVING count() = 1234567;
SELECT count() FROM test.count HAVING count() != 1234567;
DROP TABLE test.count;

View File

@ -0,0 +1,2 @@
2469134
1234567

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS test.count;
CREATE TABLE test.count (x UInt64) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO test.count SELECT * FROM numbers(1234567);
SELECT count() FROM remote('127.0.0.{1,2}', test.count);
SELECT count() / 2 FROM remote('127.0.0.{1,2}', test.count);
DROP TABLE test.count;

View File

@ -14,9 +14,9 @@ SELECT count() FROM merge_tree;
SET max_rows_to_read = 900000;
SET merge_tree_uniform_read_distribution = 1;
SELECT count() FROM merge_tree; -- { serverError 158 }
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
SET merge_tree_uniform_read_distribution = 0;
SELECT count() FROM merge_tree; -- { serverError 158 }
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
DROP TABLE merge_tree;