mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-03 13:02:00 +00:00
Instant count() for MergeTree
Use (Replicated)MergeTree's metadata to do trivial count()
This commit is contained in:
parent
a17cefb9c6
commit
2c75a51d4f
@ -380,6 +380,7 @@ struct Settings : public SettingsCollection<Settings>
|
|||||||
M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \
|
M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \
|
||||||
\
|
\
|
||||||
M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.") \
|
M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.") \
|
||||||
|
M(SettingBool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.") \
|
||||||
\
|
\
|
||||||
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
|
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
|
||||||
\
|
\
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
#include <DataStreams/ReverseBlockInputStream.h>
|
#include <DataStreams/ReverseBlockInputStream.h>
|
||||||
#include <DataStreams/FillingBlockInputStream.h>
|
#include <DataStreams/FillingBlockInputStream.h>
|
||||||
#include <DataStreams/SquashingBlockInputStream.h>
|
#include <DataStreams/SquashingBlockInputStream.h>
|
||||||
|
#include <DataStreams/OneBlockInputStream.h>
|
||||||
|
|
||||||
#include <Parsers/ASTFunction.h>
|
#include <Parsers/ASTFunction.h>
|
||||||
#include <Parsers/ASTIdentifier.h>
|
#include <Parsers/ASTIdentifier.h>
|
||||||
@ -65,6 +66,7 @@
|
|||||||
#include <Common/checkStackSize.h>
|
#include <Common/checkStackSize.h>
|
||||||
#include <Parsers/queryToString.h>
|
#include <Parsers/queryToString.h>
|
||||||
#include <ext/map.h>
|
#include <ext/map.h>
|
||||||
|
#include <ext/scope_guard.h>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
#include <Processors/Sources/NullSource.h>
|
#include <Processors/Sources/NullSource.h>
|
||||||
@ -90,6 +92,7 @@
|
|||||||
#include <Processors/Transforms/FinishSortingTransform.h>
|
#include <Processors/Transforms/FinishSortingTransform.h>
|
||||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||||
#include <DataStreams/materializeBlock.h>
|
#include <DataStreams/materializeBlock.h>
|
||||||
|
#include <IO/MemoryReadWriteBuffer.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -1273,6 +1276,65 @@ void InterpreterSelectQuery::executeFetchColumns(
|
|||||||
auto & query = getSelectQuery();
|
auto & query = getSelectQuery();
|
||||||
const Settings & settings = context.getSettingsRef();
|
const Settings & settings = context.getSettingsRef();
|
||||||
|
|
||||||
|
/// Optimization for trivial query like SELECT count() FROM table.
|
||||||
|
auto check_trivial_count_query = [&]() -> std::optional<AggregateDescription>
|
||||||
|
{
|
||||||
|
if (!settings.optimize_trivial_count_query || !syntax_analyzer_result->maybe_optimize_trivial_count || !storage
|
||||||
|
|| query.sample_size() || query.sample_offset() || query.final() || query.prewhere() || query.where()
|
||||||
|
|| !query_analyzer->hasAggregation() || processing_stage != QueryProcessingStage::FetchColumns)
|
||||||
|
return {};
|
||||||
|
|
||||||
|
Names key_names;
|
||||||
|
AggregateDescriptions aggregates;
|
||||||
|
query_analyzer->getAggregateInfo(key_names, aggregates);
|
||||||
|
|
||||||
|
if (aggregates.size() != 1)
|
||||||
|
return {};
|
||||||
|
|
||||||
|
const AggregateDescription & desc = aggregates[0];
|
||||||
|
if (typeid_cast<AggregateFunctionCount *>(desc.function.get()))
|
||||||
|
return desc;
|
||||||
|
|
||||||
|
return {};
|
||||||
|
};
|
||||||
|
|
||||||
|
if (auto desc = check_trivial_count_query())
|
||||||
|
{
|
||||||
|
auto func = desc->function;
|
||||||
|
std::optional<UInt64> num_rows = storage->totalRows();
|
||||||
|
if (num_rows)
|
||||||
|
{
|
||||||
|
AggregateFunctionCount & agg_count = static_cast<AggregateFunctionCount &>(*func);
|
||||||
|
|
||||||
|
/// We will process it up to "WithMergeableState".
|
||||||
|
std::vector<char> state(agg_count.sizeOfData());
|
||||||
|
AggregateDataPtr place = state.data();
|
||||||
|
|
||||||
|
agg_count.create(place);
|
||||||
|
SCOPE_EXIT(agg_count.destroy(place));
|
||||||
|
|
||||||
|
MemoryWriteBuffer out;
|
||||||
|
writeVarUInt(*num_rows, out);
|
||||||
|
auto in = out.tryGetReadBuffer();
|
||||||
|
agg_count.deserialize(place, *in, nullptr);
|
||||||
|
|
||||||
|
auto column = ColumnAggregateFunction::create(func);
|
||||||
|
column->insertFrom(place);
|
||||||
|
|
||||||
|
Block block_with_count{
|
||||||
|
{std::move(column), std::make_shared<DataTypeAggregateFunction>(func, DataTypes(), Array()), desc->column_name}};
|
||||||
|
|
||||||
|
auto istream = std::make_shared<OneBlockInputStream>(block_with_count);
|
||||||
|
if constexpr (pipeline_with_processors)
|
||||||
|
pipeline.init({std::make_shared<SourceFromInputStream>(istream)});
|
||||||
|
else
|
||||||
|
pipeline.streams.emplace_back(istream);
|
||||||
|
from_stage = QueryProcessingStage::WithMergeableState;
|
||||||
|
analysis_result.first_stage = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Actions to calculate ALIAS if required.
|
/// Actions to calculate ALIAS if required.
|
||||||
ExpressionActionsPtr alias_actions;
|
ExpressionActionsPtr alias_actions;
|
||||||
|
|
||||||
|
@ -727,6 +727,7 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA
|
|||||||
/// You need to read at least one column to find the number of rows.
|
/// You need to read at least one column to find the number of rows.
|
||||||
if (select_query && required.empty())
|
if (select_query && required.empty())
|
||||||
{
|
{
|
||||||
|
maybe_optimize_trivial_count = true;
|
||||||
/// We will find a column with minimum <compressed_size, type_size, uncompressed_size>.
|
/// We will find a column with minimum <compressed_size, type_size, uncompressed_size>.
|
||||||
/// Because it is the column that is cheapest to read.
|
/// Because it is the column that is cheapest to read.
|
||||||
struct ColumnSizeTuple
|
struct ColumnSizeTuple
|
||||||
|
@ -48,6 +48,8 @@ struct SyntaxAnalyzerResult
|
|||||||
/// Results of scalar sub queries
|
/// Results of scalar sub queries
|
||||||
Scalars scalars;
|
Scalars scalars;
|
||||||
|
|
||||||
|
bool maybe_optimize_trivial_count = false;
|
||||||
|
|
||||||
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns);
|
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns);
|
||||||
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
|
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
|
||||||
const Scalars & getScalars() const { return scalars; }
|
const Scalars & getScalars() const { return scalars; }
|
||||||
|
@ -406,6 +406,13 @@ public:
|
|||||||
/// Returns storage policy if storage supports it
|
/// Returns storage policy if storage supports it
|
||||||
virtual DiskSpace::StoragePolicyPtr getStoragePolicy() const { return {}; }
|
virtual DiskSpace::StoragePolicyPtr getStoragePolicy() const { return {}; }
|
||||||
|
|
||||||
|
/** If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
|
||||||
|
*/
|
||||||
|
virtual std::optional<UInt64> totalRows() const
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// You always need to take the next three locks in this order.
|
/// You always need to take the next three locks in this order.
|
||||||
|
|
||||||
|
@ -2374,6 +2374,20 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
size_t MergeTreeData::getTotalActiveSizeInRows() const
|
||||||
|
{
|
||||||
|
size_t res = 0;
|
||||||
|
{
|
||||||
|
auto lock = lockParts();
|
||||||
|
|
||||||
|
for (auto & part : getDataPartsStateRange(DataPartState::Committed))
|
||||||
|
res += part->rows_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
size_t MergeTreeData::getPartsCount() const
|
size_t MergeTreeData::getPartsCount() const
|
||||||
{
|
{
|
||||||
auto lock = lockParts();
|
auto lock = lockParts();
|
||||||
@ -2486,7 +2500,7 @@ void MergeTreeData::throwInsertIfNeeded() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
|
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
|
||||||
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/)
|
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const
|
||||||
{
|
{
|
||||||
auto current_state_parts_range = getDataPartsStateRange(state);
|
auto current_state_parts_range = getDataPartsStateRange(state);
|
||||||
|
|
||||||
@ -2534,13 +2548,13 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info)
|
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info) const
|
||||||
{
|
{
|
||||||
auto lock = lockParts();
|
auto lock = lockParts();
|
||||||
return getActiveContainingPart(part_info, DataPartState::Committed, lock);
|
return getActiveContainingPart(part_info, DataPartState::Committed, lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
|
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name) const
|
||||||
{
|
{
|
||||||
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
|
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
|
||||||
return getActiveContainingPart(part_info);
|
return getActiveContainingPart(part_info);
|
||||||
|
@ -435,9 +435,9 @@ public:
|
|||||||
DataPartsVector getDataPartsVector() const;
|
DataPartsVector getDataPartsVector() const;
|
||||||
|
|
||||||
/// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr.
|
/// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr.
|
||||||
DataPartPtr getActiveContainingPart(const String & part_name);
|
DataPartPtr getActiveContainingPart(const String & part_name) const;
|
||||||
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info);
|
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const;
|
||||||
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock);
|
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const;
|
||||||
|
|
||||||
/// Swap part with it's identical copy (possible with another path on another disk).
|
/// Swap part with it's identical copy (possible with another path on another disk).
|
||||||
/// If original part is not active or doesn't exist exception will be thrown.
|
/// If original part is not active or doesn't exist exception will be thrown.
|
||||||
@ -453,6 +453,8 @@ public:
|
|||||||
/// Total size of active parts in bytes.
|
/// Total size of active parts in bytes.
|
||||||
size_t getTotalActiveSizeInBytes() const;
|
size_t getTotalActiveSizeInBytes() const;
|
||||||
|
|
||||||
|
size_t getTotalActiveSizeInRows() const;
|
||||||
|
|
||||||
size_t getPartsCount() const;
|
size_t getPartsCount() const;
|
||||||
size_t getMaxPartsCountForPartition() const;
|
size_t getMaxPartsCountForPartition() const;
|
||||||
|
|
||||||
|
@ -134,6 +134,11 @@ BlockInputStreams StorageMergeTree::read(
|
|||||||
return reader.read(column_names, query_info, context, max_block_size, num_streams);
|
return reader.read(column_names, query_info, context, max_block_size, num_streams);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<UInt64> StorageMergeTree::totalRows() const
|
||||||
|
{
|
||||||
|
return getTotalActiveSizeInRows();
|
||||||
|
}
|
||||||
|
|
||||||
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context)
|
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context)
|
||||||
{
|
{
|
||||||
return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block);
|
return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block);
|
||||||
|
@ -45,6 +45,8 @@ public:
|
|||||||
size_t max_block_size,
|
size_t max_block_size,
|
||||||
unsigned num_streams) override;
|
unsigned num_streams) override;
|
||||||
|
|
||||||
|
std::optional<UInt64> totalRows() const override;
|
||||||
|
|
||||||
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
|
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
|
||||||
|
|
||||||
/** Perform the next step in combining the parts.
|
/** Perform the next step in combining the parts.
|
||||||
|
@ -171,13 +171,13 @@ void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
|
|||||||
current_zookeeper = zookeeper;
|
current_zookeeper = zookeeper;
|
||||||
}
|
}
|
||||||
|
|
||||||
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper()
|
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const
|
||||||
{
|
{
|
||||||
std::lock_guard lock(current_zookeeper_mutex);
|
std::lock_guard lock(current_zookeeper_mutex);
|
||||||
return current_zookeeper;
|
return current_zookeeper;
|
||||||
}
|
}
|
||||||
|
|
||||||
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper()
|
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const
|
||||||
{
|
{
|
||||||
auto res = tryGetZooKeeper();
|
auto res = tryGetZooKeeper();
|
||||||
if (!res)
|
if (!res)
|
||||||
@ -2920,6 +2920,58 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMergeTree::getMaxAddedBlocks() const
|
||||||
|
{
|
||||||
|
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks;
|
||||||
|
|
||||||
|
for (const auto & data_part : getDataParts())
|
||||||
|
{
|
||||||
|
max_added_blocks[data_part->info.partition_id]
|
||||||
|
= std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto zookeeper = getZooKeeper();
|
||||||
|
|
||||||
|
const String quorum_status_path = zookeeper_path + "/quorum/status";
|
||||||
|
|
||||||
|
String value;
|
||||||
|
Coordination::Stat stat;
|
||||||
|
|
||||||
|
if (zookeeper->tryGet(quorum_status_path, value, &stat))
|
||||||
|
{
|
||||||
|
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
||||||
|
quorum_entry.fromString(value);
|
||||||
|
|
||||||
|
auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name, format_version);
|
||||||
|
|
||||||
|
max_added_blocks[part_info.partition_id] = part_info.max_block - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
String added_parts_str;
|
||||||
|
if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", added_parts_str))
|
||||||
|
{
|
||||||
|
if (!added_parts_str.empty())
|
||||||
|
{
|
||||||
|
ReplicatedMergeTreeQuorumAddedParts part_with_quorum(format_version);
|
||||||
|
part_with_quorum.fromString(added_parts_str);
|
||||||
|
|
||||||
|
auto added_parts = part_with_quorum.added_parts;
|
||||||
|
|
||||||
|
for (const auto & added_part : added_parts)
|
||||||
|
if (!getActiveContainingPart(added_part.second))
|
||||||
|
throw Exception(
|
||||||
|
"Replica doesn't have part " + added_part.second
|
||||||
|
+ " which was successfully written to quorum of other replicas."
|
||||||
|
" Send query to another replica or disable 'select_sequential_consistency' setting.",
|
||||||
|
ErrorCodes::REPLICA_IS_NOT_IN_QUORUM);
|
||||||
|
|
||||||
|
for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks())
|
||||||
|
max_added_blocks[max_block.first] = max_block.second;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return max_added_blocks;
|
||||||
|
}
|
||||||
|
|
||||||
BlockInputStreams StorageReplicatedMergeTree::read(
|
BlockInputStreams StorageReplicatedMergeTree::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
@ -2937,50 +2989,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
|
|||||||
*/
|
*/
|
||||||
if (settings_.select_sequential_consistency)
|
if (settings_.select_sequential_consistency)
|
||||||
{
|
{
|
||||||
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks;
|
auto max_added_blocks = getMaxAddedBlocks();
|
||||||
|
|
||||||
for (const auto & data_part : getDataParts())
|
|
||||||
{
|
|
||||||
max_added_blocks[data_part->info.partition_id] = std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto zookeeper = getZooKeeper();
|
|
||||||
|
|
||||||
const String quorum_status_path = zookeeper_path + "/quorum/status";
|
|
||||||
|
|
||||||
String value;
|
|
||||||
Coordination::Stat stat;
|
|
||||||
|
|
||||||
if (zookeeper->tryGet(quorum_status_path, value, &stat))
|
|
||||||
{
|
|
||||||
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
|
||||||
quorum_entry.fromString(value);
|
|
||||||
|
|
||||||
auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name, format_version);
|
|
||||||
|
|
||||||
max_added_blocks[part_info.partition_id] = part_info.max_block - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
String added_parts_str;
|
|
||||||
if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", added_parts_str))
|
|
||||||
{
|
|
||||||
if (!added_parts_str.empty())
|
|
||||||
{
|
|
||||||
ReplicatedMergeTreeQuorumAddedParts part_with_quorum(format_version);
|
|
||||||
part_with_quorum.fromString(added_parts_str);
|
|
||||||
|
|
||||||
auto added_parts = part_with_quorum.added_parts;
|
|
||||||
|
|
||||||
for (const auto & added_part : added_parts)
|
|
||||||
if (!getActiveContainingPart(added_part.second))
|
|
||||||
throw Exception("Replica doesn't have part " + added_part.second + " which was successfully written to quorum of other replicas."
|
|
||||||
" Send query to another replica or disable 'select_sequential_consistency' setting.", ErrorCodes::REPLICA_IS_NOT_IN_QUORUM);
|
|
||||||
|
|
||||||
for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks())
|
|
||||||
max_added_blocks[max_block.first] = max_block.second;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return reader.read(column_names, query_info, context, max_block_size, num_streams, &max_added_blocks);
|
return reader.read(column_names, query_info, context, max_block_size, num_streams, &max_added_blocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2988,6 +2997,26 @@ BlockInputStreams StorageReplicatedMergeTree::read(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
std::optional<UInt64> StorageReplicatedMergeTree::totalRows() const
|
||||||
|
{
|
||||||
|
size_t res = 0;
|
||||||
|
auto max_added_blocks = getMaxAddedBlocks();
|
||||||
|
auto lock = lockParts();
|
||||||
|
for (auto & part : getDataPartsStateRange(DataPartState::Committed))
|
||||||
|
{
|
||||||
|
if (part->isEmpty())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
auto blocks_iterator = max_added_blocks.find(part->info.partition_id);
|
||||||
|
if (blocks_iterator == max_added_blocks.end() || part->info.max_block > blocks_iterator->second)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
res += part->rows_count;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void StorageReplicatedMergeTree::assertNotReadonly() const
|
void StorageReplicatedMergeTree::assertNotReadonly() const
|
||||||
{
|
{
|
||||||
if (is_readonly)
|
if (is_readonly)
|
||||||
|
@ -96,6 +96,8 @@ public:
|
|||||||
size_t max_block_size,
|
size_t max_block_size,
|
||||||
unsigned num_streams) override;
|
unsigned num_streams) override;
|
||||||
|
|
||||||
|
std::optional<UInt64> totalRows() const override;
|
||||||
|
|
||||||
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
|
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
|
||||||
|
|
||||||
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override;
|
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override;
|
||||||
@ -174,6 +176,10 @@ public:
|
|||||||
bool canUseAdaptiveGranularity() const override;
|
bool canUseAdaptiveGranularity() const override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
/// Get a sequential consistent view of current parts.
|
||||||
|
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
|
||||||
|
|
||||||
/// Delete old parts from disk and from ZooKeeper.
|
/// Delete old parts from disk and from ZooKeeper.
|
||||||
void clearOldPartsAndRemoveFromZK();
|
void clearOldPartsAndRemoveFromZK();
|
||||||
|
|
||||||
@ -191,10 +197,10 @@ private:
|
|||||||
using LogEntryPtr = LogEntry::Ptr;
|
using LogEntryPtr = LogEntry::Ptr;
|
||||||
|
|
||||||
zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below.
|
zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below.
|
||||||
std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread.
|
mutable std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread.
|
||||||
|
|
||||||
zkutil::ZooKeeperPtr tryGetZooKeeper();
|
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
|
||||||
zkutil::ZooKeeperPtr getZooKeeper();
|
zkutil::ZooKeeperPtr getZooKeeper() const;
|
||||||
void setZooKeeper(zkutil::ZooKeeperPtr zookeeper);
|
void setZooKeeper(zkutil::ZooKeeperPtr zookeeper);
|
||||||
|
|
||||||
/// If true, the table is offline and can not be written to it.
|
/// If true, the table is offline and can not be written to it.
|
||||||
|
@ -14,9 +14,9 @@ SELECT count() FROM merge_tree;
|
|||||||
SET max_rows_to_read = 900000;
|
SET max_rows_to_read = 900000;
|
||||||
|
|
||||||
SET merge_tree_uniform_read_distribution = 1;
|
SET merge_tree_uniform_read_distribution = 1;
|
||||||
SELECT count() FROM merge_tree; -- { serverError 158 }
|
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
|
||||||
|
|
||||||
SET merge_tree_uniform_read_distribution = 0;
|
SET merge_tree_uniform_read_distribution = 0;
|
||||||
SELECT count() FROM merge_tree; -- { serverError 158 }
|
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
|
||||||
|
|
||||||
DROP TABLE merge_tree;
|
DROP TABLE merge_tree;
|
||||||
|
Loading…
Reference in New Issue
Block a user