Merge branch 'master' into filesystemCacheSizeLimitMetric

This commit is contained in:
Krzysztof Góralski 2023-07-21 10:29:34 +02:00 committed by GitHub
commit f9d310c77e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 241 additions and 157 deletions

View File

@ -473,23 +473,30 @@ void KeeperDispatcher::shutdown()
session_to_response_callback.clear();
}
// if there is no leader, there is no reason to do CLOSE because it's a write request
if (server && hasLeader() && !close_requests.empty())
if (server && !close_requests.empty())
{
LOG_INFO(log, "Trying to close {} session(s)", close_requests.size());
const auto raft_result = server->putRequestBatch(close_requests);
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
auto sessions_closing_done = sessions_closing_done_promise->get_future();
raft_result->when_ready([my_sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
nuraft::ptr<std::exception> & /*exception*/) { my_sessions_closing_done_promise->set_value(); });
// if there is no leader, there is no reason to do CLOSE because it's a write request
if (hasLeader())
{
LOG_INFO(log, "Trying to close {} session(s)", close_requests.size());
const auto raft_result = server->putRequestBatch(close_requests);
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
auto sessions_closing_done = sessions_closing_done_promise->get_future();
raft_result->when_ready([my_sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
nuraft::ptr<std::exception> & /*exception*/) { my_sessions_closing_done_promise->set_value(); });
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)
LOG_WARNING(
log,
"Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.",
session_shutdown_timeout);
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)
LOG_WARNING(
log,
"Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.",
session_shutdown_timeout);
}
else
{
LOG_INFO(log, "Sessions cannot be closed during shutdown because there is no active leader");
}
}
if (server)

View File

@ -127,8 +127,8 @@ class IColumn;
\
M(Bool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.", 0) \
M(Bool, optimize_move_to_prewhere_if_final, false, "If query has `FINAL`, the optimization `move_to_prewhere` is not always correct and it is enabled only if both settings `optimize_move_to_prewhere` and `optimize_move_to_prewhere_if_final` are turned on", 0) \
M(Bool, move_all_conditions_to_prewhere, false, "Move all viable conditions from WHERE to PREWHERE", 0) \
M(Bool, enable_multiple_prewhere_read_steps, false, "Move more conditions from WHERE to PREWHERE and do reads from disk and filtering in multiple steps if there are multiple conditions combined with AND", 0) \
M(Bool, move_all_conditions_to_prewhere, true, "Move all viable conditions from WHERE to PREWHERE", 0) \
M(Bool, enable_multiple_prewhere_read_steps, true, "Move more conditions from WHERE to PREWHERE and do reads from disk and filtering in multiple steps if there are multiple conditions combined with AND", 0) \
\
M(UInt64, alter_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) ALIAS(replication_alter_partitions_sync) \
M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - do not wait, negative - wait for unlimited time.", 0) \

View File

@ -138,8 +138,11 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes)
if (table_expression_modifiers && table_expression_modifiers->hasSampleSizeRatio())
{
const auto & sampling_key = storage_snapshot->getMetadataForQuery()->getSamplingKey();
const auto & sampling_columns = sampling_key.sample_block.getColumnsWithTypeAndName();
required_columns_after_filter.insert(required_columns_after_filter.end(), sampling_columns.begin(), sampling_columns.end());
const auto & sampling_source_columns = sampling_key.expression->getRequiredColumnsWithTypes();
for (const auto & column : sampling_source_columns)
required_columns_after_filter.push_back(ColumnWithTypeAndName(column.type, column.name));
const auto & sampling_result_columns = sampling_key.sample_block.getColumnsWithTypeAndName();
required_columns_after_filter.insert(required_columns_after_filter.end(), sampling_result_columns.begin(), sampling_result_columns.end());
}
const auto & storage = storage_snapshot->storage;

View File

@ -328,11 +328,22 @@ MergeTreeReadTaskColumns getReadTaskColumns(
NameSet columns_from_previous_steps;
auto add_step = [&](const PrewhereExprStep & step)
{
Names step_column_names = step.actions->getActionsDAG().getRequiredColumnsNames();
Names step_column_names;
/// Computation results from previous steps might be used in the current step as well. In such a case these
/// computed columns will be present in the current step inputs. They don't need to be read from the disk so
/// exclude them from the list of columns to read. This filtering must be done before injecting required
/// columns to avoid adding unnecessary columns or failing to find required columns that are computation
/// results from previous steps.
/// Example: step1: sin(a)>b, step2: sin(a)>c
for (const auto & name : step.actions->getActionsDAG().getRequiredColumnsNames())
if (!columns_from_previous_steps.contains(name))
step_column_names.push_back(name);
injectRequiredColumns(
data_part_info_for_reader, storage_snapshot, with_subcolumns, step_column_names);
/// More columns could have been added, filter them as well by the list of columns from previous steps.
Names columns_to_read_in_step;
for (const auto & name : step_column_names)
{
@ -343,6 +354,10 @@ MergeTreeReadTaskColumns getReadTaskColumns(
columns_from_previous_steps.insert(name);
}
/// Add results of the step to the list of already "known" columns so that we don't read or compute them again.
for (const auto & name : step.actions->getActionsDAG().getNames())
columns_from_previous_steps.insert(name);
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, columns_to_read_in_step));
};

View File

@ -6,6 +6,7 @@ import socket
import struct
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
# from kazoo.protocol.serialization import Connect, read_buffer, write_buffer
@ -162,17 +163,40 @@ def test_session_timeout(started_cluster):
def test_session_close_shutdown(started_cluster):
wait_nodes()
node1_zk = get_fake_zk(node1.name)
node2_zk = get_fake_zk(node2.name)
node1_zk = None
node2_zk = None
for i in range(20):
node1_zk = get_fake_zk(node1.name)
node2_zk = get_fake_zk(node2.name)
eph_node = "/test_node"
node2_zk.create(eph_node, ephemeral=True)
node1_zk.sync(eph_node)
assert node1_zk.exists(eph_node) != None
eph_node = "/test_node"
node2_zk.create(eph_node, ephemeral=True)
node1_zk.sync(eph_node)
# shutdown while session is active
node2.stop_clickhouse()
node1_zk.exists(eph_node) != None
assert node1_zk.exists(eph_node) == None
# restart while session is active so it's closed during shutdown
node2.restart_clickhouse()
node2.start_clickhouse()
if node1_zk.exists(eph_node) == None:
break
assert node2.contains_in_log(
"Sessions cannot be closed during shutdown because there is no active leader"
)
try:
node1_zk.delete(eph_node)
except NoNodeError:
pass
assert node1_zk.exists(eph_node) == None
destroy_zk_client(node1_zk)
node1_zk = None
destroy_zk_client(node2_zk)
node2_zk = None
time.sleep(1)
else:
assert False, "Session wasn't properly cleaned up on shutdown"

View File

@ -9,5 +9,4 @@ SELECT
x3,
x4
FROM prewhere_move
PREWHERE x1 > 100
WHERE (x1 > 100) AND ((x2 > 100) AND (x3 > 100) AND (x4 > 100))
PREWHERE (x1 > 100) AND (x2 > 100) AND (x3 > 100) AND (x4 > 100)

View File

@ -1,85 +1,83 @@
ReadFromMergeTree (default.test_index)
Indexes:
MinMax
Keys:
y
Condition: (y in [1, +Inf))
Parts: 4/5
Granules: 11/12
Partition
Keys:
y
bitAnd(z, 3)
Condition: and((y in [1, +Inf)), (bitAnd(z, 3) not in [1, 1]))
Parts: 3/4
Granules: 10/11
PrimaryKey
Keys:
x
y
Condition: and((x in [11, +Inf)), (y in [1, +Inf)))
Parts: 2/3
Granules: 6/10
Skip
Name: t_minmax
Description: minmax GRANULARITY 2
Parts: 1/2
Granules: 4/6
Skip
Name: t_set
Description: set GRANULARITY 2
Parts: 1/1
Granules: 2/4
ReadFromMergeTree (default.test_index)
Indexes:
MinMax
Keys:
y
Condition: (y in [1, +Inf))
Parts: 4/5
Granules: 11/12
Partition
Keys:
y
bitAnd(z, 3)
Condition: and((y in [1, +Inf)), (bitAnd(z, 3) not in [1, 1]))
Parts: 3/4
Granules: 10/11
PrimaryKey
Keys:
x
y
Condition: and((x in [11, +Inf)), (y in [1, +Inf)))
Parts: 2/3
Granules: 6/10
Skip
Name: t_minmax
Description: minmax GRANULARITY 2
Parts: 1/2
Granules: 4/6
Skip
Name: t_set
Description: set GRANULARITY 2
Parts: 1/1
Granules: 2/4
-----------------
"Node Type": "ReadFromMergeTree",
"Description": "default.test_index",
"Indexes": [
{
"Type": "MinMax",
"Keys": ["y"],
"Condition": "(y in [1, +Inf))",
"Initial Parts": 5,
"Selected Parts": 4,
"Initial Granules": 12,
"Selected Granules": 11
},
{
"Type": "Partition",
"Keys": ["y", "bitAnd(z, 3)"],
"Condition": "and((y in [1, +Inf)), (bitAnd(z, 3) not in [1, 1]))",
"Initial Parts": 4,
"Selected Parts": 3,
"Initial Granules": 11,
"Selected Granules": 10
},
{
"Type": "PrimaryKey",
"Keys": ["x", "y"],
"Condition": "and((x in [11, +Inf)), (y in [1, +Inf)))",
"Initial Parts": 3,
"Selected Parts": 2,
"Initial Granules": 10,
"Selected Granules": 6
},
{
"Type": "Skip",
"Name": "t_minmax",
"Description": "minmax GRANULARITY 2",
"Initial Parts": 2,
"Selected Parts": 1,
"Initial Granules": 6,
"Selected Granules": 4
},
{
"Type": "Skip",
"Name": "t_set",
"Description": "set GRANULARITY 2",
"Initial Parts": 1,
"Selected Parts": 1,
"Initial Granules": 4,
"Selected Granules": 2
}
]
"Node Type": "ReadFromMergeTree",
"Description": "default.test_index",
"Indexes": [
{
"Type": "MinMax",
"Keys": ["y"],
"Condition": "(y in [1, +Inf))",
"Initial Parts": 5,
"Selected Parts": 4,
"Initial Granules": 12,
"Selected Granules": 11
},
{
"Type": "Partition",
"Keys": ["y", "bitAnd(z, 3)"],
"Condition": "and((y in [1, +Inf)), (bitAnd(z, 3) not in [1, 1]))",
"Initial Parts": 4,
"Selected Parts": 3,
"Initial Granules": 11,
"Selected Granules": 10
},
{
"Type": "PrimaryKey",
"Keys": ["x", "y"],
"Condition": "and((x in [11, +Inf)), (y in [1, +Inf)))",
"Initial Parts": 3,
"Selected Parts": 2,
"Initial Granules": 10,
"Selected Granules": 6
},
{
"Type": "Skip",
"Name": "t_minmax",
"Description": "minmax GRANULARITY 2",
"Initial Parts": 2,
"Selected Parts": 1,
"Initial Granules": 6,
"Selected Granules": 4
},
{
"Type": "Skip",
"Name": "t_set",
"Description": "set GRANULARITY 2",
"Initial Parts": 1,
"Selected Parts": 1,
"Initial Granules": 4,
"Selected Granules": 2
}
]
}

View File

@ -3,12 +3,10 @@
35
SELECT count()
FROM t_move_to_prewhere
PREWHERE a AND b AND c
WHERE (a AND b AND c) AND (NOT ignore(fat_string))
PREWHERE a AND b AND c AND (NOT ignore(fat_string))
1 Compact
2 Compact
35
SELECT count()
FROM t_move_to_prewhere
PREWHERE a
WHERE a AND (b AND c AND (NOT ignore(fat_string)))
PREWHERE a AND b AND c AND (NOT ignore(fat_string))

View File

@ -1 +1,5 @@
111
111
111
111
111

View File

@ -6,7 +6,10 @@ CREATE TABLE t1 ( s String, f Float32, e UInt16 ) ENGINE = MergeTree ORDER BY tu
INSERT INTO t1 VALUES ('111', 1, 1);
SELECT s FROM t1 WHERE f AND (e = 1); -- { serverError 59 }
SELECT s FROM t1 WHERE f AND (e = 1);
SELECT s FROM t1 WHERE f AND (e = 1) SETTINGS optimize_move_to_prewhere=true;
SELECT s FROM t1 WHERE f AND (e = 1) SETTINGS optimize_move_to_prewhere=false;
SELECT s FROM t1 PREWHERE f AND (e = 1);
SELECT s FROM t1 PREWHERE f; -- { serverError 59 }
SELECT s FROM t1 PREWHERE f WHERE (e = 1); -- { serverError 59 }
SELECT s FROM t1 PREWHERE f WHERE f AND (e = 1); -- { serverError 59 }

View File

@ -1,7 +1,6 @@
SELECT count()
FROM t_02156_merge1
PREWHERE k = 3
WHERE (k = 3) AND notEmpty(v)
PREWHERE (k = 3) AND notEmpty(v)
2
SELECT count()
FROM t_02156_merge2

View File

@ -0,0 +1,14 @@
-- { echoOn }
SELECT a FROM t_02559 PREWHERE sin(a) < b AND sin(a) < c;
1
2
SELECT sin(a) > 2 FROM t_02559 PREWHERE sin(a) < b AND sin(a) < c;
0
0
SELECT sin(a) < a FROM t_02559 PREWHERE sin(a) < b AND sin(a) < c AND sin(a) > -a;
1
1
SELECT sin(a) < a FROM t_02559 PREWHERE sin(a) < b AND a <= c AND sin(a) > -a;
1
1

View File

@ -0,0 +1,17 @@
DROP TABLE IF EXISTS t_02559;
CREATE TABLE t_02559 (a Int64, b Int64, c Int64) ENGINE = MergeTree ORDER BY a;
INSERT INTO t_02559 SELECT number, number, number FROM numbers(3);
SET enable_multiple_prewhere_read_steps = 1;
-- { echoOn }
SELECT a FROM t_02559 PREWHERE sin(a) < b AND sin(a) < c;
SELECT sin(a) > 2 FROM t_02559 PREWHERE sin(a) < b AND sin(a) < c;
SELECT sin(a) < a FROM t_02559 PREWHERE sin(a) < b AND sin(a) < c AND sin(a) > -a;
SELECT sin(a) < a FROM t_02559 PREWHERE sin(a) < b AND a <= c AND sin(a) > -a;
-- {echoOff}
DROP TABLE t_02559;

View File

@ -1,40 +1,40 @@
1 2 3
1 2 3
1 2 3
ReadFromMergeTree (default.data_02771)
Indexes:
PrimaryKey
Condition: true
Parts: 1/1
Granules: 1/1
Skip
Name: x_idx
Description: minmax GRANULARITY 1
Parts: 0/1
Granules: 0/1
Skip
Name: y_idx
Description: minmax GRANULARITY 1
Parts: 0/0
Granules: 0/0
Skip
Name: xy_idx
Description: minmax GRANULARITY 1
Parts: 0/0
Granules: 0/0
ReadFromMergeTree (default.data_02771)
Indexes:
PrimaryKey
Condition: true
Parts: 1/1
Granules: 1/1
Skip
Name: x_idx
Description: minmax GRANULARITY 1
Parts: 0/1
Granules: 0/1
Skip
Name: y_idx
Description: minmax GRANULARITY 1
Parts: 0/0
Granules: 0/0
ReadFromMergeTree (default.data_02771)
Indexes:
PrimaryKey
Condition: true
Parts: 1/1
Granules: 1/1
Skip
Name: x_idx
Description: minmax GRANULARITY 1
Parts: 0/1
Granules: 0/1
Skip
Name: y_idx
Description: minmax GRANULARITY 1
Parts: 0/0
Granules: 0/0
Skip
Name: xy_idx
Description: minmax GRANULARITY 1
Parts: 0/0
Granules: 0/0
ReadFromMergeTree (default.data_02771)
Indexes:
PrimaryKey
Condition: true
Parts: 1/1
Granules: 1/1
Skip
Name: x_idx
Description: minmax GRANULARITY 1
Parts: 0/1
Granules: 0/1
Skip
Name: y_idx
Description: minmax GRANULARITY 1
Parts: 0/0
Granules: 0/0

View File

@ -4,6 +4,7 @@
SET max_bytes_to_read = 600000000;
SET optimize_move_to_prewhere = 1;
SET enable_multiple_prewhere_read_steps = 1;
SELECT uniq(URL) FROM test.hits WHERE toTimeZone(EventTime, 'Asia/Dubai') >= '2014-03-20 00:00:00' AND toTimeZone(EventTime, 'Asia/Dubai') < '2014-03-21 00:00:00';
SELECT uniq(URL) FROM test.hits WHERE toTimeZone(EventTime, 'Asia/Dubai') >= '2014-03-20 00:00:00' AND URL != '' AND toTimeZone(EventTime, 'Asia/Dubai') < '2014-03-21 00:00:00';
@ -11,6 +12,8 @@ SELECT uniq(*) FROM test.hits WHERE toTimeZone(EventTime, 'Asia/Dubai') >= '2014
WITH toTimeZone(EventTime, 'Asia/Dubai') AS xyz SELECT uniq(*) FROM test.hits WHERE xyz >= '2014-03-20 00:00:00' AND xyz < '2014-03-21 00:00:00' AND EventDate = '2014-03-21';
SET optimize_move_to_prewhere = 0;
SET enable_multiple_prewhere_read_steps = 0;
SELECT uniq(URL) FROM test.hits WHERE toTimeZone(EventTime, 'Asia/Dubai') >= '2014-03-20 00:00:00' AND toTimeZone(EventTime, 'Asia/Dubai') < '2014-03-21 00:00:00'; -- { serverError 307 }
SELECT uniq(URL) FROM test.hits WHERE toTimeZone(EventTime, 'Asia/Dubai') >= '2014-03-20 00:00:00' AND URL != '' AND toTimeZone(EventTime, 'Asia/Dubai') < '2014-03-21 00:00:00'; -- { serverError 307 }
SELECT uniq(URL) FROM test.hits PREWHERE toTimeZone(EventTime, 'Asia/Dubai') >= '2014-03-20 00:00:00' AND URL != '' AND toTimeZone(EventTime, 'Asia/Dubai') < '2014-03-21 00:00:00'; -- { serverError 307 }