In case if one Distributed has multiple shards, and underlying
Distributed has only one, there can be the case when the query will be
tried to process from Complete to WithMergeableStateAfterAggregation,
which is obviously wrong.
Before the following queries was running LimitBy/Distinct step on the
initator:
select distinct sharding_key from dist order by k
While this can be omitted.
Before this patch it wasn't possible to optimize simple SELECT * FROM
dist ORDER BY (w/o GROUP BY and DISTINCT) to more optimal stage
(QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit),
since that code was under
allow_nondeterministic_optimize_skip_unused_shards, rework it and make
it possible.
Also now distributed_push_down_limit is respected for
optimize_distributed_group_by_sharding_key.
Next step will be to enable distributed_push_down_limit by default.
v2: fix detection of aggregates
After #25884 parts that was dropped (DROP_RANGE) will not be removed
from old mutations and this will stuck the mutation.
Interesting, that this mutation may continue after some merge.
This was introduced in https://github.com/ClickHouse/ClickHouse/pull/8602.
The idea was to avoid data re-appearing in ClickHouse after DROP/DETACH
PARTITION. This problem was only present in MergeTree engine and I don't
understand why we need to do the same in ReplicatedMergeTree.
For ReplicatedMergeTree the state of truth is stored in ZK, deleting
things from filesystem just introduces inconsistencies and this is the
main source for errors like "No active replica has part X or covering
part".
The resulting problem is fixed by
https://github.com/ClickHouse/ClickHouse/pull/25820, but in my opinion
we would better avoid introducing the ZK/FS inconsistency in the first
place.
When does this inconsistency appear? Often the sequence is like this:
0. Write 2 parts to ZK [all_0_0_0, all_1_1_0]
1. A merge gets scheduled
2. New part replaces old parts [new: all_0_1_1, old: all_0_0_0, all_1_1_0]
3. Replica gets shutdown and old parts are removed from filesystem
4. Replica comes back online, metadata about all parts is still stored in ZK for this new replica.
5. Other replica after cleanup thread runs will have only [all_0_1_1] in
ZK
5. User triggers a DROP_RANGE after a while (drop range is for all_0_1_9999*)
6. Each replica deletes from ZK only [all_0_1_1]. The replica that got
restarted uses its in-memory state to choose nodes to delete from ZK.
7. Restart the replica again. It will now think that there are 2 parts
that it lost and needs to fetch them [all_0_0_0, all_1_1_0].
`clearOldPartsAndRemoveFromZK` which is triggered from cleanup thread
runs cleanup sequence correctly, it first removes things from ZK and
then from filesystem. I don't see much benefit of triggering it on
shutdown and would rather have it called only from a single place.
---
This is a very, very edge case situation but it proves that the current
"fix" (https://github.com/ClickHouse/ClickHouse/pull/25820) isn't
complete.
```
create table test(
v UInt64
)
engine=ReplicatedMergeTree('/clickhouse/test', 'one')
order by v
settings old_parts_lifetime = 30;
create table test2(
v UInt64
)
engine=ReplicatedMergeTree('/clickhouse/test', 'two')
order by v
settings old_parts_lifetime = 30;
create table test3(
v UInt64
)
engine=ReplicatedMergeTree('/clickhouse/test', 'three')
order by v
settings old_parts_lifetime = 30;
insert into table test values (1), (2), (3);
insert into table test values (4);
optimize table test final;
detach table test;
detach table test2;
alter table test3 drop partition tuple();
attach table test;
attach table test2;
```
```
(CONNECTED [localhost:9181]) /> ls /clickhouse/test/replicas/one/parts
all_0_0_0
all_1_1_0
(CONNECTED [localhost:9181]) /> ls /clickhouse/test/replicas/two/parts
all_0_0_0
all_1_1_0
(CONNECTED [localhost:9181]) /> ls /clickhouse/test/replicas/three/parts
```
```
detach table test;
attach table test;
```
`test` will now figure out that parts exist only in ZK and will issue `GET_PART`
after first removing parts from ZK.
`test2` will receive fetch for unknown parts and will trigger part checks itself.
Because `test` doesn't have the parts anymore in ZK `test2` will mark them as LostForever.
It will also not insert empty parts, because the partition is empty.
`test` is left with `GET_PART` in the queue and stuck.
```
SELECT
table,
type,
replica_name,
new_part_name,
last_exception
FROM system.replication_queue
Query id: 74c5aa00-048d-4bc1-a2ea-6f69501c11a0
Row 1:
──────
table: test
type: GET_PART
replica_name: one
new_part_name: all_0_0_0
last_exception: Code: 234. DB::Exception: No active replica has part all_0_0_0 or covering part. (NO_REPLICA_HAS_PART) (version 21.9.1.1)
Row 2:
──────
table: test
type: GET_PART
replica_name: one
new_part_name: all_1_1_0
last_exception: Code: 234. DB::Exception: No active replica has part all_1_1_0 or covering part. (NO_REPLICA_HAS_PART) (version 21.9.1.1)
```