mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #17309 from nikitamikhaylov/merging-sequential-consistency
Merging #16309
This commit is contained in:
commit
084c75fa6e
@ -1177,7 +1177,7 @@ void InterpreterSelectQuery::executeFetchColumns(
|
||||
const auto & func = desc.function;
|
||||
std::optional<UInt64> num_rows{};
|
||||
if (!query.prewhere() && !query.where())
|
||||
num_rows = storage->totalRows();
|
||||
num_rows = storage->totalRows(settings);
|
||||
else // It's possible to optimize count() given only partition predicates
|
||||
{
|
||||
SelectQueryInfo temp_query_info;
|
||||
|
@ -463,7 +463,7 @@ public:
|
||||
/// - For total_rows column in system.tables
|
||||
///
|
||||
/// Does takes underlying Storage (if any) into account.
|
||||
virtual std::optional<UInt64> totalRows() const { return {}; }
|
||||
virtual std::optional<UInt64> totalRows(const Settings &) const { return {}; }
|
||||
|
||||
/// Same as above but also take partition predicate into account.
|
||||
virtual std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo &, const Context &) const { return {}; }
|
||||
@ -481,7 +481,7 @@ public:
|
||||
/// Memory part should be estimated as a resident memory size.
|
||||
/// In particular, alloctedBytes() is preferable over bytes()
|
||||
/// when considering in-memory blocks.
|
||||
virtual std::optional<UInt64> totalBytes() const { return {}; }
|
||||
virtual std::optional<UInt64> totalBytes(const Settings &) const { return {}; }
|
||||
|
||||
/// Number of rows INSERTed since server start.
|
||||
///
|
||||
|
@ -867,13 +867,13 @@ void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const S
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageBuffer::totalRows() const
|
||||
std::optional<UInt64> StorageBuffer::totalRows(const Settings & settings) const
|
||||
{
|
||||
std::optional<UInt64> underlying_rows;
|
||||
auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
|
||||
|
||||
if (underlying)
|
||||
underlying_rows = underlying->totalRows();
|
||||
underlying_rows = underlying->totalRows(settings);
|
||||
if (!underlying_rows)
|
||||
return underlying_rows;
|
||||
|
||||
@ -886,7 +886,7 @@ std::optional<UInt64> StorageBuffer::totalRows() const
|
||||
return rows + *underlying_rows;
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageBuffer::totalBytes() const
|
||||
std::optional<UInt64> StorageBuffer::totalBytes(const Settings & /*settings*/) const
|
||||
{
|
||||
UInt64 bytes = 0;
|
||||
for (const auto & buffer : buffers)
|
||||
|
@ -109,8 +109,8 @@ public:
|
||||
/// The structure of the subordinate table is not checked and does not change.
|
||||
void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override;
|
||||
|
||||
std::optional<UInt64> totalRows() const override;
|
||||
std::optional<UInt64> totalBytes() const override;
|
||||
std::optional<UInt64> totalRows(const Settings & settings) const override;
|
||||
std::optional<UInt64> totalBytes(const Settings & settings) const override;
|
||||
|
||||
std::optional<UInt64> lifetimeRows() const override { return writes.rows; }
|
||||
std::optional<UInt64> lifetimeBytes() const override { return writes.bytes; }
|
||||
|
@ -102,8 +102,8 @@ HashJoinPtr StorageJoin::getJoin(std::shared_ptr<TableJoin> analyzed_join) const
|
||||
void StorageJoin::insertBlock(const Block & block) { join->addJoinedBlock(block, true); }
|
||||
|
||||
size_t StorageJoin::getSize() const { return join->getTotalRowCount(); }
|
||||
std::optional<UInt64> StorageJoin::totalRows() const { return join->getTotalRowCount(); }
|
||||
std::optional<UInt64> StorageJoin::totalBytes() const { return join->getTotalByteCount(); }
|
||||
std::optional<UInt64> StorageJoin::totalRows(const Settings &) const { return join->getTotalRowCount(); }
|
||||
std::optional<UInt64> StorageJoin::totalBytes(const Settings &) const { return join->getTotalByteCount(); }
|
||||
|
||||
|
||||
void registerStorageJoin(StorageFactory & factory)
|
||||
|
@ -46,8 +46,8 @@ public:
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
std::optional<UInt64> totalRows() const override;
|
||||
std::optional<UInt64> totalBytes() const override;
|
||||
std::optional<UInt64> totalRows(const Settings & settings) const override;
|
||||
std::optional<UInt64> totalBytes(const Settings & settings) const override;
|
||||
|
||||
private:
|
||||
Block sample_block;
|
||||
|
@ -216,21 +216,18 @@ void StorageMemory::truncate(
|
||||
total_size_rows.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
||||
std::optional<UInt64> StorageMemory::totalRows() const
|
||||
std::optional<UInt64> StorageMemory::totalRows(const Settings &) const
|
||||
{
|
||||
/// All modifications of these counters are done under mutex which automatically guarantees synchronization/consistency
|
||||
/// When run concurrently we are fine with any value: "before" or "after"
|
||||
return total_size_rows.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
||||
std::optional<UInt64> StorageMemory::totalBytes() const
|
||||
std::optional<UInt64> StorageMemory::totalBytes(const Settings &) const
|
||||
{
|
||||
return total_size_bytes.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
||||
void registerStorageMemory(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("Memory", [](const StorageFactory::Arguments & args)
|
||||
|
@ -46,8 +46,8 @@ public:
|
||||
|
||||
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
|
||||
|
||||
std::optional<UInt64> totalRows() const override;
|
||||
std::optional<UInt64> totalBytes() const override;
|
||||
std::optional<UInt64> totalRows(const Settings &) const override;
|
||||
std::optional<UInt64> totalBytes(const Settings &) const override;
|
||||
|
||||
/** Delays initialization of StorageMemory::read() until the first read is actually happen.
|
||||
* Usually, fore code like this:
|
||||
|
@ -202,7 +202,7 @@ Pipe StorageMergeTree::read(
|
||||
return plan.convertToPipe();
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageMergeTree::totalRows() const
|
||||
std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
|
||||
{
|
||||
return getTotalActiveSizeInRows();
|
||||
}
|
||||
@ -223,7 +223,7 @@ std::optional<UInt64> StorageMergeTree::totalRowsByPartitionPredicate(const Sele
|
||||
return res;
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageMergeTree::totalBytes() const
|
||||
std::optional<UInt64> StorageMergeTree::totalBytes(const Settings &) const
|
||||
{
|
||||
return getTotalActiveSizeInBytes();
|
||||
}
|
||||
|
@ -56,9 +56,9 @@ public:
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
std::optional<UInt64> totalRows() const override;
|
||||
std::optional<UInt64> totalRows(const Settings &) const override;
|
||||
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo &, const Context &) const override;
|
||||
std::optional<UInt64> totalBytes() const override;
|
||||
std::optional<UInt64> totalBytes(const Settings &) const override;
|
||||
|
||||
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
|
||||
|
||||
|
@ -45,11 +45,11 @@ public:
|
||||
|
||||
void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override;
|
||||
|
||||
std::optional<UInt64> totalRows() const override
|
||||
std::optional<UInt64> totalRows(const Settings &) const override
|
||||
{
|
||||
return {0};
|
||||
}
|
||||
std::optional<UInt64> totalBytes() const override
|
||||
std::optional<UInt64> totalBytes(const Settings &) const override
|
||||
{
|
||||
return {0};
|
||||
}
|
||||
|
@ -148,8 +148,8 @@ public:
|
||||
bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); }
|
||||
Strings getDataPaths() const override { return getNested()->getDataPaths(); }
|
||||
StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); }
|
||||
std::optional<UInt64> totalRows() const override { return getNested()->totalRows(); }
|
||||
std::optional<UInt64> totalBytes() const override { return getNested()->totalBytes(); }
|
||||
std::optional<UInt64> totalRows(const Settings & settings) const override { return getNested()->totalRows(settings); }
|
||||
std::optional<UInt64> totalBytes(const Settings & settings) const override { return getNested()->totalBytes(settings); }
|
||||
std::optional<UInt64> lifetimeRows() const override { return getNested()->lifetimeRows(); }
|
||||
std::optional<UInt64> lifetimeBytes() const override { return getNested()->lifetimeBytes(); }
|
||||
|
||||
|
@ -3742,27 +3742,37 @@ Pipe StorageReplicatedMergeTree::read(
|
||||
|
||||
|
||||
template <class Func>
|
||||
void StorageReplicatedMergeTree::foreachCommittedParts(const Func & func) const
|
||||
void StorageReplicatedMergeTree::foreachCommittedParts(Func && func, bool select_sequential_consistency) const
|
||||
{
|
||||
auto max_added_blocks = getMaxAddedBlocks();
|
||||
std::optional<ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock> max_added_blocks = {};
|
||||
|
||||
/**
|
||||
* Synchronously go to ZooKeeper when select_sequential_consistency enabled
|
||||
*/
|
||||
if (select_sequential_consistency)
|
||||
max_added_blocks = getMaxAddedBlocks();
|
||||
|
||||
auto lock = lockParts();
|
||||
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
|
||||
{
|
||||
if (part->isEmpty())
|
||||
continue;
|
||||
|
||||
auto blocks_iterator = max_added_blocks.find(part->info.partition_id);
|
||||
if (blocks_iterator == max_added_blocks.end() || part->info.max_block > blocks_iterator->second)
|
||||
continue;
|
||||
if (max_added_blocks)
|
||||
{
|
||||
auto blocks_iterator = max_added_blocks->find(part->info.partition_id);
|
||||
if (blocks_iterator == max_added_blocks->end() || part->info.max_block > blocks_iterator->second)
|
||||
continue;
|
||||
}
|
||||
|
||||
func(part);
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageReplicatedMergeTree::totalRows() const
|
||||
std::optional<UInt64> StorageReplicatedMergeTree::totalRows(const Settings & settings) const
|
||||
{
|
||||
UInt64 res = 0;
|
||||
foreachCommittedParts([&res](auto & part) { res += part->rows_count; });
|
||||
foreachCommittedParts([&res](auto & part) { res += part->rows_count; }, settings.select_sequential_consistency);
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -3777,14 +3787,14 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalRowsByPartitionPredicate(
|
||||
{
|
||||
if (!partition_pruner.canBePruned(part))
|
||||
res += part->rows_count;
|
||||
});
|
||||
}, context.getSettingsRef().select_sequential_consistency);
|
||||
return res;
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageReplicatedMergeTree::totalBytes() const
|
||||
std::optional<UInt64> StorageReplicatedMergeTree::totalBytes(const Settings & settings) const
|
||||
{
|
||||
UInt64 res = 0;
|
||||
foreachCommittedParts([&res](auto & part) { res += part->getBytesOnDisk(); });
|
||||
foreachCommittedParts([&res](auto & part) { res += part->getBytesOnDisk(); }, settings.select_sequential_consistency);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -107,9 +107,9 @@ public:
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
std::optional<UInt64> totalRows() const override;
|
||||
std::optional<UInt64> totalRows(const Settings & settings) const override;
|
||||
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const override;
|
||||
std::optional<UInt64> totalBytes() const override;
|
||||
std::optional<UInt64> totalBytes(const Settings & settings) const override;
|
||||
|
||||
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
|
||||
|
||||
@ -326,7 +326,7 @@ private:
|
||||
const size_t replicated_fetches_pool_size;
|
||||
|
||||
template <class Func>
|
||||
void foreachCommittedParts(const Func & func) const;
|
||||
void foreachCommittedParts(Func && func, bool select_sequential_consistency) const;
|
||||
|
||||
/** Creates the minimum set of nodes in ZooKeeper and create first replica.
|
||||
* Returns true if was created, false if exists.
|
||||
|
@ -153,8 +153,8 @@ void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block);
|
||||
void StorageSet::finishInsert() { set->finishInsert(); }
|
||||
|
||||
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
|
||||
std::optional<UInt64> StorageSet::totalRows() const { return set->getTotalRowCount(); }
|
||||
std::optional<UInt64> StorageSet::totalBytes() const { return set->getTotalByteCount(); }
|
||||
std::optional<UInt64> StorageSet::totalRows(const Settings &) const { return set->getTotalRowCount(); }
|
||||
std::optional<UInt64> StorageSet::totalBytes(const Settings &) const { return set->getTotalByteCount(); }
|
||||
|
||||
void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
|
||||
{
|
||||
|
@ -73,8 +73,8 @@ public:
|
||||
|
||||
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
|
||||
|
||||
std::optional<UInt64> totalRows() const override;
|
||||
std::optional<UInt64> totalBytes() const override;
|
||||
std::optional<UInt64> totalRows(const Settings & settings) const override;
|
||||
std::optional<UInt64> totalBytes(const Settings & settings) const override;
|
||||
|
||||
private:
|
||||
SetPtr set;
|
||||
|
@ -429,7 +429,7 @@ protected:
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
assert(table != nullptr);
|
||||
auto total_rows = table->totalRows();
|
||||
auto total_rows = table->totalRows(context.getSettingsRef());
|
||||
if (total_rows)
|
||||
res_columns[res_index++]->insert(*total_rows);
|
||||
else
|
||||
@ -439,7 +439,7 @@ protected:
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
assert(table != nullptr);
|
||||
auto total_bytes = table->totalBytes();
|
||||
auto total_bytes = table->totalBytes(context.getSettingsRef());
|
||||
if (total_bytes)
|
||||
res_columns[res_index++]->insert(*total_bytes);
|
||||
else
|
||||
|
@ -13,7 +13,7 @@
|
||||
<host>zoo3</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
<session_timeout_ms>2000</session_timeout_ms>
|
||||
<session_timeout_ms>20000</session_timeout_ms>
|
||||
</zookeeper>
|
||||
</yandex>
|
||||
|
@ -62,14 +62,16 @@ def test_reload_zookeeper(start_cluster):
|
||||
|
||||
## stop all zookeepers, table will be readonly
|
||||
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
|
||||
node.query("SELECT COUNT() FROM test_table")
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node.query("SELECT COUNT() FROM test_table")
|
||||
node.query("SELECT COUNT() FROM test_table", settings={"select_sequential_consistency" : 1})
|
||||
|
||||
## start zoo2, zoo3, table will be readonly too, because it only connect to zoo1
|
||||
cluster.start_zookeeper_nodes(["zoo2", "zoo3"])
|
||||
wait_zookeeper_node_to_start(["zoo2", "zoo3"])
|
||||
node.query("SELECT COUNT() FROM test_table")
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node.query("SELECT COUNT() FROM test_table")
|
||||
node.query("SELECT COUNT() FROM test_table", settings={"select_sequential_consistency" : 1})
|
||||
|
||||
## set config to zoo2, server will be normal
|
||||
new_config = """
|
||||
|
@ -0,0 +1,3 @@
|
||||
3
|
||||
4
|
||||
4
|
@ -0,0 +1,36 @@
|
||||
SET send_logs_level = 'fatal';
|
||||
|
||||
DROP TABLE IF EXISTS quorum1 SYNC;
|
||||
DROP TABLE IF EXISTS quorum2 SYNC;
|
||||
DROP TABLE IF EXISTS quorum3 SYNC;
|
||||
|
||||
CREATE TABLE quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test_01513/sequence_consistency', '1') ORDER BY x PARTITION BY y;
|
||||
CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test_01513/sequence_consistency', '2') ORDER BY x PARTITION BY y;
|
||||
CREATE TABLE quorum3(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test_01513/sequence_consistency', '3') ORDER BY x PARTITION BY y;
|
||||
|
||||
INSERT INTO quorum1 VALUES (1, '1990-11-15');
|
||||
INSERT INTO quorum1 VALUES (2, '1990-11-15');
|
||||
INSERT INTO quorum1 VALUES (3, '2020-12-16');
|
||||
|
||||
SYSTEM SYNC REPLICA quorum2;
|
||||
SYSTEM SYNC REPLICA quorum3;
|
||||
|
||||
SET select_sequential_consistency=0;
|
||||
SET optimize_trivial_count_query=1;
|
||||
SET insert_quorum=2;
|
||||
|
||||
SYSTEM STOP FETCHES quorum1;
|
||||
|
||||
INSERT INTO quorum2 VALUES (4, toDate('2020-12-16'));
|
||||
|
||||
SYSTEM SYNC REPLICA quorum3;
|
||||
|
||||
-- Should read local committed parts instead of throwing error code: 289. DB::Exception: Replica doesn't have part 20201216_1_1_0 which was successfully written to quorum of other replicas.
|
||||
SELECT count() FROM quorum1;
|
||||
|
||||
SELECT count() FROM quorum2;
|
||||
SELECT count() FROM quorum3;
|
||||
|
||||
DROP TABLE quorum1 SYNC;
|
||||
DROP TABLE quorum2 SYNC;
|
||||
DROP TABLE quorum3 SYNC;
|
Loading…
Reference in New Issue
Block a user