Merge branch 'master' into keeper-with-disks

This commit is contained in:
Antonio Andelic 2023-05-26 10:48:41 +00:00
commit 8a2a63a7bd
131 changed files with 1940 additions and 584 deletions

View File

@ -23,7 +23,6 @@ curl https://clickhouse.com/ | sh
## Upcoming Events
* [**v23.5 Release Webinar**](https://clickhouse.com/company/events/v23-5-release-webinar?utm_source=github&utm_medium=social&utm_campaign=release-webinar-2023-05) - May 31 - 23.5 is rapidly approaching. Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release.
* [**ClickHouse Meetup in Berlin**](https://www.meetup.com/clickhouse-berlin-user-group/events/292892466) - May 16
* [**ClickHouse Meetup in Barcelona**](https://www.meetup.com/clickhouse-barcelona-user-group/events/292892669) - May 25
* [**ClickHouse Meetup in London**](https://www.meetup.com/clickhouse-london-user-group/events/292892824) - May 25
* [**ClickHouse Meetup in San Francisco**](https://www.meetup.com/clickhouse-silicon-valley-meetup-group/events/293426725/) - Jun 7

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit ecccfc026a42b30023289410a67024d561f4bf3e
Subproject commit ca02358dcc7ce3ab733dd4cbcc32734eecfa4ee3

2
contrib/aws-c-auth vendored

@ -1 +1 @@
Subproject commit 30df6c407e2df43bd244e2c34c9b4a4b87372bfb
Subproject commit 97133a2b5dbca1ccdf88cd6f44f39d0531d27d12

@ -1 +1 @@
Subproject commit 324fd1d973ccb25c813aa747bf1759cfde5121c5
Subproject commit 45dcb2849c891dba2100b270b4676765c92949ff

@ -1 +1 @@
Subproject commit 39bfa94a14b7126bf0c1330286ef8db452d87e66
Subproject commit 2f9b60c42f90840ec11822acda3d8cdfa97a773d

2
contrib/aws-c-http vendored

@ -1 +1 @@
Subproject commit 2c5a2a7d5556600b9782ffa6c9d7e09964df1abc
Subproject commit dd34461987947672444d0bc872c5a733dfdb9711

2
contrib/aws-c-io vendored

@ -1 +1 @@
Subproject commit 5d32c453560d0823df521a686bf7fbacde7f9be3
Subproject commit d58ed4f272b1cb4f89ac9196526ceebe5f2b0d89

2
contrib/aws-c-mqtt vendored

@ -1 +1 @@
Subproject commit 882c689561a3db1466330ccfe3b63637e0a575d3
Subproject commit 33c3455cec82b16feb940e12006cefd7b3ef4194

2
contrib/aws-c-s3 vendored

@ -1 +1 @@
Subproject commit a41255ece72a7c887bba7f9d998ca3e14f4c8a1b
Subproject commit d7bfe602d6925948f1fff95784e3613cca6a3900

@ -1 +1 @@
Subproject commit 25bf5cf225f977c3accc6a05a0a7a181ef2a4a30
Subproject commit 208a701fa01e99c7c8cc3dcebc8317da71362972

@ -1 +1 @@
Subproject commit 48e7c0e01479232f225c8044d76c84e74192889d
Subproject commit ad53be196a25bbefa3700a01187fdce573a7d2d0

View File

@ -52,8 +52,8 @@ endif()
# Directories.
SET(AWS_SDK_DIR "${ClickHouse_SOURCE_DIR}/contrib/aws")
SET(AWS_SDK_CORE_DIR "${AWS_SDK_DIR}/aws-cpp-sdk-core")
SET(AWS_SDK_S3_DIR "${AWS_SDK_DIR}/aws-cpp-sdk-s3")
SET(AWS_SDK_CORE_DIR "${AWS_SDK_DIR}/src/aws-cpp-sdk-core")
SET(AWS_SDK_S3_DIR "${AWS_SDK_DIR}/generated/src/aws-cpp-sdk-s3")
SET(AWS_AUTH_DIR "${ClickHouse_SOURCE_DIR}/contrib/aws-c-auth")
SET(AWS_CAL_DIR "${ClickHouse_SOURCE_DIR}/contrib/aws-c-cal")
@ -118,7 +118,7 @@ configure_file("${AWS_SDK_CORE_DIR}/include/aws/core/SDKConfig.h.in"
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MAJOR=1")
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MINOR=10")
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_PATCH=36")
list(APPEND AWS_SOURCES ${AWS_SDK_CORE_SRC} ${AWS_SDK_CORE_NET_SRC} ${AWS_SDK_CORE_PLATFORM_SRC})
list(APPEND AWS_PUBLIC_INCLUDES

2
contrib/aws-crt-cpp vendored

@ -1 +1 @@
Subproject commit ec0bea288f451d884c0d80d534bc5c66241c39a4
Subproject commit 8a301b7e842f1daed478090c869207300972379f

2
contrib/aws-s2n-tls vendored

@ -1 +1 @@
Subproject commit 0f1ba9e5c4a67cb3898de0c0b4f911d4194dc8de
Subproject commit 71f4794b7580cf780eb4aca77d69eded5d3c7bb4

View File

@ -5,8 +5,8 @@ echo "Using sparse checkout for aws"
FILES_TO_CHECKOUT=$(git rev-parse --git-dir)/info/sparse-checkout
echo '/*' > $FILES_TO_CHECKOUT
echo '!/*/*' >> $FILES_TO_CHECKOUT
echo '/aws-cpp-sdk-core/*' >> $FILES_TO_CHECKOUT
echo '/aws-cpp-sdk-s3/*' >> $FILES_TO_CHECKOUT
echo '/src/aws-cpp-sdk-core/*' >> $FILES_TO_CHECKOUT
echo '/generated/src/aws-cpp-sdk-s3/*' >> $FILES_TO_CHECKOUT
git config core.sparsecheckout true
git checkout $1

View File

@ -132,6 +132,9 @@ function run_tests()
ADDITIONAL_OPTIONS+=('--report-logs-stats')
clickhouse-test "00001_select_1" > /dev/null ||:
clickhouse-client -q "insert into system.zookeeper (name, path, value) values ('auxiliary_zookeeper2', '/test/chroot/', '')" ||:
set +e
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \
--test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \

View File

@ -65,6 +65,9 @@ sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
start
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
@ -94,6 +97,9 @@ sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
start
clickhouse-client --query="SELECT 'Server version: ', version()"

View File

@ -177,11 +177,11 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
- `--user, -u` The username. Default value: default.
- `--password` The password. Default value: empty string.
- `--ask-password` - Prompt the user to enter a password.
- `--query, -q` The query to process when using non-interactive mode. You must specify either `query` or `queries-file` option.
- `--queries-file` file path with queries to execute. You must specify either `query` or `queries-file` option.
- `--database, -d` Select the current default database. Default value: the current database from the server settings (default by default).
- `--multiline, -m` If specified, allow multiline queries (do not send the query on Enter).
- `--query, -q` The query to process when using non-interactive mode. Cannot be used simultaneously with `--queries-file`.
- `--queries-file` file path with queries to execute. Cannot be used simultaneously with `--query`.
- `--multiquery, -n` If specified, multiple queries separated by semicolons can be listed after the `--query` option. For convenience, it is also possible to omit `--query` and pass the queries directly after `--multiquery`.
- `--multiline, -m` If specified, allow multiline queries (do not send the query on Enter).
- `--database, -d` Select the current default database. Default value: the current database from the server settings (default by default).
- `--format, -f` Use the specified default format to output the result.
- `--vertical, -E` If specified, use the [Vertical format](../interfaces/formats.md#vertical) by default to output the result. This is the same as `format=Vertical`. In this format, each value is printed on a separate line, which is helpful when displaying wide tables.
- `--time, -t` If specified, print the query execution time to stderr in non-interactive mode.

View File

@ -0,0 +1,27 @@
---
slug: /en/operations/system-tables/build_options
---
# build_options
Contains information about the ClickHouse server's build options.
Columns:
- `name` (String) — Name of the build option, e.g. `USE_ODBC`
- `value` (String) — Value of the build option, e.g. `1`
**Example**
``` sql
SELECT * FROM system.build_options LIMIT 5
```
``` text
┌─name─────────────┬─value─┐
│ USE_BROTLI │ 1 │
│ USE_BZIP2 │ 1 │
│ USE_CAPNP │ 1 │
│ USE_CASSANDRA │ 1 │
│ USE_DATASKETCHES │ 1 │
└──────────────────┴───────┘
```

View File

@ -5,16 +5,18 @@ This table contains profiling on processors level (that you can find in [`EXPLAI
Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — The date when the event happened.
- `event_time` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — The date and time when the event happened.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — The date and time when the event happened.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — The date and time with microseconds precision when the event happened.
- `id` ([UInt64](../../sql-reference/data-types/int-uint.md)) — ID of processor
- `parent_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Parent processors IDs
- `plan_step` ([UInt64](../../sql-reference/data-types/int-uint.md)) — ID of the query plan step which created this processor. The value is zero if the processor was not added from any step.
- `plan_group` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Group of the processor if it was created by query plan step. A group is a logical partitioning of processors added from the same query plan step. Group is used only for beautifying the result of EXPLAIN PIPELINE result.
- `initial_query_id` ([String](../../sql-reference/data-types/string.md)) — ID of the initial query (for distributed query execution).
- `query_id` ([String](../../sql-reference/data-types/string.md)) — ID of the query
- `name` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Name of the processor.
- `elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was executed.
- `input_wait_elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was waiting for data (from other processor).
- `output_wait_elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was waiting because output port was full.
- `plan_step` ([UInt64](../../sql-reference/data-types/int-uint.md)) — ID of the query plan step which created this processor. The value is zero if the processor was not added from any step.
- `plan_group` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Group of the processor if it was created by query plan step. A group is a logical partitioning of processors added from the same query plan step. Group is used only for beautifying the result of EXPLAIN PIPELINE result.
- `input_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The number of rows consumed by processor.
- `input_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The number of bytes consumed by processor.
- `output_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The number of rows generated by processor.

View File

@ -59,9 +59,10 @@ Columns:
- `query_kind` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Type of the query.
- `databases` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the databases present in the query.
- `tables` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the tables present in the query.
- `views` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the (materialized or live) views present in the query.
- `columns` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the columns present in the query.
- `partitions` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the partitions present in the query.
- `projections` ([String](../../sql-reference/data-types/string.md)) — Names of the projections used during the query execution.
- `views` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the (materialized or live) views present in the query.
- `exception_code` ([Int32](../../sql-reference/data-types/int-uint.md)) — Code of an exception.
- `exception` ([String](../../sql-reference/data-types/string.md)) — Exception message.
- `stack_trace` ([String](../../sql-reference/data-types/string.md)) — [Stack trace](https://en.wikipedia.org/wiki/Stack_trace). An empty string, if the query was completed successfully.

View File

@ -183,12 +183,12 @@ Arguments:
- `-S`, `--structure` — table structure for input data.
- `--input-format` — input format, `TSV` by default.
- `-f`, `--file` — path to data, `stdin` by default.
- `-q`, `--query` — queries to execute with `;` as delimiter. You must specify either `query` or `queries-file` option.
- `--queries-file` - file path with queries to execute. You must specify either `query` or `queries-file` option.
- `-q`, `--query` — queries to execute with `;` as delimiter. Cannot be used simultaneously with `--queries-file`.
- `--queries-file` - file path with queries to execute. Cannot be used simultaneously with `--query`.
- `--multiquery, -n` If specified, multiple queries separated by semicolons can be listed after the `--query` option. For convenience, it is also possible to omit `--query` and pass the queries directly after `--multiquery`.
- `-N`, `--table` — table name where to put output data, `table` by default.
- `--format`, `--output-format` — output format, `TSV` by default.
- `-d`, `--database` — default database, `_local` by default.
- `--multiquery, -n` If specified, multiple queries separated by semicolons can be listed after the `--query` option. For convenience, it is also possible to omit `--query` and pass the queries directly after `--multiquery`.
- `--stacktrace` — whether to dump debug output in case of exception.
- `--echo` — print query before execution.
- `--verbose` — more details on query execution.

View File

@ -544,6 +544,54 @@ Result:
└─────┴──────────┴───────┘
```
##Filling grouped by sorting prefix
It can be useful to fill rows which have the same values in particular columns independently, - a good example is filling missing values in time series.
Assume there is the following time series table
``` sql
CREATE TABLE timeseries
(
`sensor_id` UInt64,
`timestamp` DateTime64(3, 'UTC'),
`value` Float64
)
ENGINE = Memory;
SELECT * FROM timeseries;
┌─sensor_id─┬───────────────timestamp─┬─value─┐
│ 234 │ 2021-12-01 00:00:03.000 │ 3 │
│ 432 │ 2021-12-01 00:00:01.000 │ 1 │
│ 234 │ 2021-12-01 00:00:07.000 │ 7 │
│ 432 │ 2021-12-01 00:00:05.000 │ 5 │
└───────────┴─────────────────────────┴───────┘
```
And we'd like to fill missing values for each sensor independently with 1 second interval.
The way to achieve it is to use `sensor_id` column as sorting prefix for filling column `timestamp`
```
SELECT *
FROM timeseries
ORDER BY
sensor_id,
timestamp WITH FILL
INTERPOLATE ( value AS 9999 )
┌─sensor_id─┬───────────────timestamp─┬─value─┐
│ 234 │ 2021-12-01 00:00:03.000 │ 3 │
│ 234 │ 2021-12-01 00:00:04.000 │ 9999 │
│ 234 │ 2021-12-01 00:00:05.000 │ 9999 │
│ 234 │ 2021-12-01 00:00:06.000 │ 9999 │
│ 234 │ 2021-12-01 00:00:07.000 │ 7 │
│ 432 │ 2021-12-01 00:00:01.000 │ 1 │
│ 432 │ 2021-12-01 00:00:02.000 │ 9999 │
│ 432 │ 2021-12-01 00:00:03.000 │ 9999 │
│ 432 │ 2021-12-01 00:00:04.000 │ 9999 │
│ 432 │ 2021-12-01 00:00:05.000 │ 5 │
└───────────┴─────────────────────────┴───────┘
```
Here, the `value` column was interpolated with `9999` just to make filled rows more noticeable
This behavior is controlled by setting `use_with_fill_by_sorting_prefix` (enabled by default)
## Related content
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)

View File

@ -1,6 +1,7 @@
#include <Backups/BackupEntryFromAppendOnlyFile.h>
#include <Disks/IDisk.h>
#include <IO/LimitSeekableReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
namespace DB

View File

@ -1,5 +1,7 @@
#include <Backups/BackupEntryFromImmutableFile.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Disks/IDisk.h>
#include <city.h>
namespace DB

View File

@ -3,7 +3,7 @@
#include <Disks/IDisk.h>
#include <IO/copyData.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>

View File

@ -15,7 +15,7 @@
#include <IO/Archives/createArchiveWriter.h>
#include <IO/ConcatSeekableReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>

View File

@ -174,7 +174,7 @@ void HedgedConnections::sendQuery(
modified_settings.group_by_two_level_threshold_bytes = 0;
}
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas == 0;
if (offset_states.size() > 1 && enable_sample_offset_parallel_processing)
{

View File

@ -142,7 +142,7 @@ void MultiplexedConnections::sendQuery(
}
}
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas == 0;
size_t num_replicas = replica_states.size();
if (num_replicas > 1)

View File

@ -3,6 +3,7 @@
#include <base/types.h>
#include <Common/Exception.h>
#include <Coordination/KeeperConstants.h>
#include <Poco/Net/SocketAddress.h>
#include <vector>
#include <memory>
@ -466,7 +467,7 @@ public:
/// Useful to check owner of ephemeral node.
virtual int64_t getSessionID() const = 0;
virtual String getConnectedAddress() const = 0;
virtual Poco::Net::SocketAddress getConnectedAddress() const = 0;
/// If the method will throw an exception, callbacks won't be called.
///

View File

@ -39,7 +39,7 @@ public:
bool isExpired() const override { return expired; }
int64_t getSessionID() const override { return 0; }
String getConnectedAddress() const override { return connected_zk_address; }
Poco::Net::SocketAddress getConnectedAddress() const override { return connected_zk_address; }
void create(
@ -127,7 +127,7 @@ private:
zkutil::ZooKeeperArgs args;
String connected_zk_address;
Poco::Net::SocketAddress connected_zk_address;
std::mutex push_request_mutex;
std::atomic<bool> expired{false};

View File

@ -112,11 +112,10 @@ void ZooKeeper::init(ZooKeeperArgs args_)
else
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
String address = impl->getConnectedAddress();
Poco::Net::SocketAddress address = impl->getConnectedAddress();
size_t colon_pos = address.find(':');
connected_zk_host = address.substr(0, colon_pos);
connected_zk_port = address.substr(colon_pos + 1);
connected_zk_host = address.host().toString();
connected_zk_port = address.port();
connected_zk_index = 0;
@ -124,7 +123,7 @@ void ZooKeeper::init(ZooKeeperArgs args_)
{
for (size_t i = 0; i < args.hosts.size(); i++)
{
if (args.hosts[i] == address)
if (args.hosts[i] == address.toString())
{
connected_zk_index = i;
break;

View File

@ -524,7 +524,7 @@ public:
void setServerCompletelyStarted();
String getConnectedZooKeeperHost() const { return connected_zk_host; }
String getConnectedZooKeeperPort() const { return connected_zk_port; }
UInt16 getConnectedZooKeeperPort() const { return connected_zk_port; }
size_t getConnectedZooKeeperIndex() const { return connected_zk_index; }
private:
@ -591,7 +591,7 @@ private:
ZooKeeperArgs args;
String connected_zk_host;
String connected_zk_port;
UInt16 connected_zk_port;
size_t connected_zk_index;
std::mutex mutex;

View File

@ -433,7 +433,7 @@ void ZooKeeper::connect(
}
connected = true;
connected_zk_address = node.address.toString();
connected_zk_address = node.address;
break;
}
@ -450,7 +450,7 @@ void ZooKeeper::connect(
if (!connected)
{
WriteBufferFromOwnString message;
connected_zk_address = "";
connected_zk_address = Poco::Net::SocketAddress();
message << "All connection tries failed while connecting to ZooKeeper. nodes: ";
bool first = true;

View File

@ -125,7 +125,7 @@ public:
/// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; }
String getConnectedAddress() const override { return connected_zk_address; }
Poco::Net::SocketAddress getConnectedAddress() const override { return connected_zk_address; }
void executeGenericRequest(
const ZooKeeperRequestPtr & request,
@ -203,7 +203,7 @@ public:
private:
ACLs default_acls;
String connected_zk_address;
Poco::Net::SocketAddress connected_zk_address;
zkutil::ZooKeeperArgs args;

View File

@ -6,6 +6,7 @@
#include <Common/Exception.h>
#include <Common/isLocalAddress.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Disks/DiskLocal.h>
#include <Common/logger_useful.h>

View File

@ -154,7 +154,7 @@ class IColumn;
M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \
\
M(String, cluster_for_parallel_replicas, "default", "Cluster for a shard in which current server is located", 0) \
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
M(UInt64, allow_experimental_parallel_reading_from_replicas, 0, "Use all the replicas from a shard for SELECT query execution. Reading is parallelized and coordinated dynamically. 0 - disabled, 1 - enabled, silently disable them in case of failure, 2 - enabled, throw an exception in case of failure", 0) \
M(Float, parallel_replicas_single_task_marks_count_multiplier, 2, "A multiplier which will be added during calculation for minimal number of marks to retrieve from coordinator. This will be applied only for remote replicas.", 0) \
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
\
@ -729,6 +729,7 @@ class IColumn;
M(UInt64, http_max_request_param_data_size, 10_MiB, "Limit on size of request data used as a query parameter in predefined HTTP requests.", 0) \
M(Bool, function_json_value_return_type_allow_nullable, false, "Allow function JSON_VALUE to return nullable type.", 0) \
M(Bool, function_json_value_return_type_allow_complex, false, "Allow function JSON_VALUE to return complex type, such as: struct, array, map.", 0) \
M(Bool, use_with_fill_by_sorting_prefix, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \

View File

@ -82,6 +82,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reader to reorder rows for better parallelism."},
{"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."},
{"use_with_fill_by_sorting_prefix", false, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently"},
{"output_format_parquet_compliant_nested_types", false, true, "Change an internal field name in output Parquet file schema."}}},
{"23.4", {{"allow_suspicious_indices", true, false, "If true, index can defined with identical expressions"},
{"connect_timeout_with_failover_ms", 50, 1000, "Increase default connect timeout because of async connect"},

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context_fwd.h>
#include <Core/Defines.h>
#include <Core/Names.h>
#include <base/types.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
@ -20,6 +21,7 @@
#include <boost/noncopyable.hpp>
#include <Poco/Timestamp.h>
#include <filesystem>
#include <sys/stat.h>
namespace fs = std::filesystem;

View File

@ -4,6 +4,7 @@
#include <vector>
#include <boost/noncopyable.hpp>
#include <Disks/IDisk.h>
#include <sys/types.h>
namespace DB
{

View File

@ -1,4 +1,4 @@
#include "AsynchronousReadIndirectBufferFromRemoteFS.h"
#include "AsynchronousBoundedReadBuffer.h"
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
@ -43,105 +43,77 @@ namespace ErrorCodes
}
AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS(
AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_)
AsyncReadCountersPtr async_read_counters_,
FilesystemReadPrefetchesLogPtr prefetches_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, impl(std::move(impl_))
, read_settings(settings_)
, reader(reader_)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.prefetch_buffer_size)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
#ifndef NDEBUG
, log(&Poco::Logger::get("AsynchronousBufferFromRemoteFS"))
#else
, log(&Poco::Logger::get("AsyncBuffer(" + impl->getFileName() + ")"))
#endif
, log(&Poco::Logger::get("AsynchronousBoundedReadBuffer"))
, async_read_counters(async_read_counters_)
, prefetches_log(prefetches_log_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
}
String AsynchronousReadIndirectBufferFromRemoteFS::getFileName() const
bool AsynchronousBoundedReadBuffer::hasPendingDataToRead()
{
return impl->getFileName();
}
String AsynchronousReadIndirectBufferFromRemoteFS::getInfoForLog()
{
return impl->getInfoForLog();
}
size_t AsynchronousReadIndirectBufferFromRemoteFS::getFileSize()
{
return impl->getFileSize();
}
bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
{
/**
* Note: read_until_position here can be std::nullopt only for non-MergeTree tables.
* For mergeTree tables it must be guaranteed that setReadUntilPosition() or
* setReadUntilEnd() is called before any read or prefetch.
* setReadUntilEnd() always sets read_until_position to file size.
* setReadUntilPosition(pos) always has pos > 0, because if
* right_offset_in_compressed_file is 0, then setReadUntilEnd() is used.
*/
if (read_until_position)
{
/// Everything is already read.
if (file_offset_of_buffer_end == *read_until_position)
if (file_offset_of_buffer_end == *read_until_position) /// Everything is already read.
return false;
if (file_offset_of_buffer_end > *read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
}
}
return true;
}
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority)
std::future<IAsynchronousReader::Result>
AsynchronousBoundedReadBuffer::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = base_priority + priority;
request.priority = read_settings.priority + priority;
request.ignore = bytes_to_ignore;
return reader.submit(request);
}
void AsynchronousReadIndirectBufferFromRemoteFS::prefetch(int64_t priority)
void AsynchronousBoundedReadBuffer::prefetch(int64_t priority)
{
if (prefetch_future.valid())
return;
/// Check boundary, which was set in readUntilPosition().
if (!hasPendingDataToRead())
return;
last_prefetch_info.submit_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
last_prefetch_info.submit_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
last_prefetch_info.priority = priority;
/// Prefetch even in case hasPendingData() == true.
chassert(prefetch_buffer.size() == read_settings.prefetch_buffer_size || prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
chassert(prefetch_buffer.size() == read_settings.prefetch_buffer_size
|| prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t position)
void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
{
if (!read_until_position || position != *read_until_position)
{
@ -157,21 +129,16 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
}
}
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd()
void AsynchronousBoundedReadBuffer::appendToPrefetchLog(
FilesystemPrefetchState state,
int64_t size,
const std::unique_ptr<Stopwatch> & execution_watch)
{
setReadUntilPosition(impl->getFileSize());
}
void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemPrefetchState state, int64_t size, const std::unique_ptr<Stopwatch> & execution_watch)
{
const auto & object = impl->getCurrentObject();
FilesystemReadPrefetchesLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.path = object.local_path,
.path = impl->getFileName(),
.offset = file_offset_of_buffer_end,
.size = size,
.prefetch_submit_time = last_prefetch_info.submit_time,
@ -187,7 +154,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemP
}
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
bool AsynchronousBoundedReadBuffer::nextImpl()
{
if (!hasPendingDataToRead())
return false;
@ -245,14 +212,14 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
/// In case of multiple files for the same file in clickhouse (i.e. log family)
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
chassert(file_offset_of_buffer_end >= impl->getImplementationBufferOffset());
chassert(file_offset_of_buffer_end >= impl->getFileOffsetOfBufferEnd());
chassert(file_offset_of_buffer_end <= impl->getFileSize());
return bytes_read;
}
off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeks);
@ -268,7 +235,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else
{
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence");
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Expected SEEK_SET or SEEK_CUR as whence");
}
/// Position is unchanged.
@ -322,9 +289,8 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
if (read_until_position && new_pos > *read_until_position)
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset();
file_offset_of_buffer_end = new_pos = *read_until_position; /// read_until_position is a non-included boundary.
impl->seek(file_offset_of_buffer_end, SEEK_SET);
return new_pos;
}
@ -332,8 +298,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
* Note: we read in range [file_offset_of_buffer_end, read_until_position).
*/
if (impl->initialized()
&& read_until_position && new_pos < *read_until_position
if (read_until_position && new_pos < *read_until_position
&& new_pos > file_offset_of_buffer_end
&& new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek)
{
@ -342,31 +307,21 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else
{
if (impl->initialized())
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset();
}
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
file_offset_of_buffer_end = new_pos;
impl->seek(file_offset_of_buffer_end, SEEK_SET);
}
return new_pos;
}
off_t AsynchronousReadIndirectBufferFromRemoteFS::getPosition()
{
return file_offset_of_buffer_end - available() + bytes_to_ignore;
}
void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
void AsynchronousBoundedReadBuffer::finalize()
{
resetPrefetch(FilesystemPrefetchState::UNNEEDED);
}
AsynchronousReadIndirectBufferFromRemoteFS::~AsynchronousReadIndirectBufferFromRemoteFS()
AsynchronousBoundedReadBuffer::~AsynchronousBoundedReadBuffer()
{
try
{
@ -378,7 +333,7 @@ AsynchronousReadIndirectBufferFromRemoteFS::~AsynchronousReadIndirectBufferFromR
}
}
void AsynchronousReadIndirectBufferFromRemoteFS::resetPrefetch(FilesystemPrefetchState state)
void AsynchronousBoundedReadBuffer::resetPrefetch(FilesystemPrefetchState state)
{
if (!prefetch_future.valid())
return;

View File

@ -0,0 +1,96 @@
#pragma once
#include "config.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/AsynchronousReader.h>
#include <IO/ReadSettings.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <utility>
namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
using AsyncReadCountersPtr = std::shared_ptr<AsyncReadCounters>;
class ReadBufferFromRemoteFSGather;
class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase
{
public:
using Impl = ReadBufferFromFileBase;
using ImplPtr = std::unique_ptr<Impl>;
explicit AsynchronousBoundedReadBuffer(
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
AsyncReadCountersPtr async_read_counters_ = nullptr,
FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr);
~AsynchronousBoundedReadBuffer() override;
String getFileName() const override { return impl->getFileName(); }
size_t getFileSize() override { return impl->getFileSize(); }
String getInfoForLog() override { return impl->getInfoForLog(); }
off_t seek(off_t offset_, int whence) override;
void prefetch(int64_t priority) override;
void setReadUntilPosition(size_t position) override; /// [..., position).
void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); }
off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }
private:
const ImplPtr impl;
const ReadSettings read_settings;
IAsynchronousReader & reader;
size_t file_offset_of_buffer_end = 0;
std::optional<size_t> read_until_position;
/// If nonzero then working_buffer is empty.
/// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
size_t bytes_to_ignore = 0;
Memory<> prefetch_buffer;
std::future<IAsynchronousReader::Result> prefetch_future;
const std::string query_id;
const std::string current_reader_id;
Poco::Logger * log;
AsyncReadCountersPtr async_read_counters;
FilesystemReadPrefetchesLogPtr prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;
size_t priority = 0;
};
LastPrefetchInfo last_prefetch_info;
bool nextImpl() override;
void finalize();
bool hasPendingDataToRead();
void appendToPrefetchLog(
FilesystemPrefetchState state,
int64_t size,
const std::unique_ptr<Stopwatch> & execution_watch);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
void resetPrefetch(FilesystemPrefetchState state);
};
}

View File

@ -1,111 +0,0 @@
#pragma once
#include "config.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/AsynchronousReader.h>
#include <IO/ReadSettings.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <utility>
namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
class ReadBufferFromRemoteFSGather;
/**
* Reads data from S3/HDFS/Web using stored paths in metadata.
* This class is an asynchronous version of ReadIndirectBufferFromRemoteFS.
*
* Buffers chain for diskS3:
* AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS ->
* -> ReadBufferFromS3 -> ReadBufferFromIStream.
*
* Buffers chain for diskWeb:
* AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS ->
* -> ReadIndirectBufferFromWebServer -> ReadBufferFromHTTP -> ReadBufferFromIStream.
*
* We pass either `memory` or `prefetch_buffer` through all this chain and return it back.
*/
class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{
public:
explicit AsynchronousReadIndirectBufferFromRemoteFS(
IAsynchronousReader & reader_, const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_);
~AsynchronousReadIndirectBufferFromRemoteFS() override;
off_t seek(off_t offset_, int whence) override;
off_t getPosition() override;
String getFileName() const override;
void prefetch(int64_t priority) override;
void setReadUntilPosition(size_t position) override; /// [..., position).
void setReadUntilEnd() override;
String getInfoForLog() override;
size_t getFileSize() override;
bool isIntegratedWithFilesystemCache() const override { return true; }
private:
bool nextImpl() override;
void finalize();
bool hasPendingDataToRead();
void appendToPrefetchLog(FilesystemPrefetchState state, int64_t size, const std::unique_ptr<Stopwatch> & execution_watch);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
void resetPrefetch(FilesystemPrefetchState state);
ReadSettings read_settings;
IAsynchronousReader & reader;
int64_t base_priority;
std::shared_ptr<ReadBufferFromRemoteFSGather> impl;
std::future<IAsynchronousReader::Result> prefetch_future;
size_t file_offset_of_buffer_end = 0;
Memory<> prefetch_buffer;
std::string query_id;
std::string current_reader_id;
/// If nonzero then working_buffer is empty.
/// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
size_t bytes_to_ignore = 0;
std::optional<size_t> read_until_position;
Poco::Logger * log;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;
size_t priority = 0;
};
LastPrefetchInfo last_prefetch_info;
};
}

View File

@ -5,6 +5,7 @@
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
#include <Common/logger_useful.h>
#include <IO/SwapHelper.h>
#include <iostream>
#include <base/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
@ -12,22 +13,24 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
}
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBuffer(nullptr, 0)
, read_buffer_creator(std::move(read_buffer_creator_))
, blobs_to_read(blobs_to_read_)
: ReadBufferFromFileBase(0, nullptr, 0)
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
{
if (cache_log_ && settings.enable_filesystem_cache_log)
cache_log = cache_log_;
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
@ -38,13 +41,12 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{
if (current_buf != nullptr && !with_cache)
if (current_buf && !with_cache)
{
appendFilesystemCacheLog();
appendUncachedReadInfo();
}
current_object = object;
total_bytes_read_from_current_file = 0;
const auto & object_path = object.remote_path;
size_t current_read_until_position = read_until_position ? read_until_position : object.bytes_size;
@ -72,7 +74,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
return current_read_buffer_creator();
}
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
{
if (!cache_log || current_object.remote_path.empty())
return;
@ -84,7 +86,7 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
.source_file_path = current_object.remote_path,
.file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_size = total_bytes_read_from_current_file,
.file_segment_size = current_object.bytes_size,
.read_from_cache_attempted = false,
};
cache_log->add(elem);
@ -176,7 +178,7 @@ bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
bool ReadBufferFromRemoteFSGather::readImpl()
{
swap(*current_buf);
SwapHelper swap(*this, *current_buf);
bool result = false;
@ -187,7 +189,6 @@ bool ReadBufferFromRemoteFSGather::readImpl()
*/
if (bytes_to_ignore)
{
total_bytes_read_from_current_file += bytes_to_ignore;
current_buf->ignore(bytes_to_ignore);
result = current_buf->hasPendingData();
file_offset_of_buffer_end += bytes_to_ignore;
@ -207,57 +208,41 @@ bool ReadBufferFromRemoteFSGather::readImpl()
file_offset_of_buffer_end += current_buf->available();
}
swap(*current_buf);
/// Required for non-async reads.
if (result)
{
assert(available());
nextimpl_working_buffer_offset = offset();
total_bytes_read_from_current_file += available();
assert(current_buf->available());
nextimpl_working_buffer_offset = current_buf->offset();
}
return result;
}
size_t ReadBufferFromRemoteFSGather::getFileOffsetOfBufferEnd() const
{
return file_offset_of_buffer_end;
}
void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t position)
{
if (position != read_until_position)
{
read_until_position = position;
reset();
}
if (position == read_until_position)
return;
reset();
read_until_position = position;
}
void ReadBufferFromRemoteFSGather::reset()
{
current_object = {};
current_buf_idx = {};
current_buf.reset();
bytes_to_ignore = 0;
}
String ReadBufferFromRemoteFSGather::getFileName() const
off_t ReadBufferFromRemoteFSGather::seek(off_t offset, int whence)
{
return current_object.remote_path;
}
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only seeking with SEEK_SET is allowed");
size_t ReadBufferFromRemoteFSGather::getFileSize() const
{
size_t size = 0;
for (const auto & object : blobs_to_read)
size += object.bytes_size;
return size;
}
String ReadBufferFromRemoteFSGather::getInfoForLog()
{
if (!current_buf)
return "";
return current_buf->getInfoForLog();
reset();
file_offset_of_buffer_end = offset;
return file_offset_of_buffer_end;
}
size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
@ -271,7 +256,7 @@ size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather()
{
if (!with_cache)
appendFilesystemCacheLog();
appendUncachedReadInfo();
}
}

View File

@ -10,12 +10,13 @@ namespace Poco { class Logger; }
namespace DB
{
class FilesystemCacheLog;
/**
* Remote disk might need to split one clickhouse file into multiple files in remote fs.
* This class works like a proxy to allow transition from one file into multiple.
*/
class ReadBufferFromRemoteFSGather final : public ReadBuffer
class ReadBufferFromRemoteFSGather final : public ReadBufferFromFileBase
{
friend class ReadIndirectBufferFromRemoteFS;
@ -30,25 +31,25 @@ public:
~ReadBufferFromRemoteFSGather() override;
String getFileName() const;
String getFileName() const override { return current_object.remote_path; }
void reset();
String getInfoForLog() override { return current_buf ? current_buf->getInfoForLog() : ""; }
void setReadUntilPosition(size_t position) override;
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore) override;
size_t getFileSize() const;
size_t getFileSize() override { return getTotalSize(blobs_to_read); }
size_t getFileOffsetOfBufferEnd() const;
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
bool initialized() const { return current_buf != nullptr; }
String getInfoForLog();
size_t getImplementationBufferOffset() const;
const StoredObject & getCurrentObject() const { return current_object; }
off_t seek(off_t offset, int whence) override;
off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }
private:
SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object);
@ -61,40 +62,26 @@ private:
bool moveToNextBuffer();
void appendFilesystemCacheLog();
void appendUncachedReadInfo();
ReadBufferCreator read_buffer_creator;
StoredObjects blobs_to_read;
ReadSettings settings;
size_t read_until_position = 0;
StoredObject current_object;
void reset();
const ReadSettings settings;
const StoredObjects blobs_to_read;
const ReadBufferCreator read_buffer_creator;
const std::shared_ptr<FilesystemCacheLog> cache_log;
const String query_id;
bool with_cache;
String query_id;
Poco::Logger * log;
SeekableReadBufferPtr current_buf;
size_t current_buf_idx = 0;
size_t read_until_position = 0;
size_t file_offset_of_buffer_end = 0;
/**
* File: |___________________|
* Buffer: |~~~~~~~|
* file_offset_of_buffer_end: ^
*/
size_t bytes_to_ignore = 0;
size_t total_bytes_read_from_current_file = 0;
StoredObject current_object;
size_t current_buf_idx = 0;
SeekableReadBufferPtr current_buf;
std::shared_ptr<FilesystemCacheLog> cache_log;
Poco::Logger * log;
};
}

View File

@ -82,7 +82,7 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
else
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET or SEEK_CUR modes are allowed.");
impl->reset();
impl->seek(impl->file_offset_of_buffer_end, SEEK_SET);
resetWorkingBuffer();
file_offset_of_buffer_end = impl->file_offset_of_buffer_end;

View File

@ -31,8 +31,6 @@ public:
void setReadUntilEnd() override;
bool isIntegratedWithFilesystemCache() const override { return true; }
size_t getFileSize() override;
private:

View File

@ -8,6 +8,7 @@
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Interpreters/Context.h>
@ -112,8 +113,8 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
if (disk_read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(reader_impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(reader_impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -5,7 +5,6 @@
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/MultiVersion.h>

View File

@ -2,6 +2,7 @@
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <base/getFQDNOrHostName.h>
#include <future>
namespace DB
{

View File

@ -7,7 +7,6 @@
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Common/getRandomASCIIString.h>

View File

@ -3,6 +3,7 @@
#include <Common/getRandomASCIIString.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/copyData.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Context.h>

View File

@ -12,12 +12,14 @@
#include <Common/Exception.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
#include <IO/copyData.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/StoredObject.h>
#include <Disks/DiskType.h>
#include <Common/ThreadPool_fwd.h>
#include <Disks/WriteMode.h>
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
namespace DB

View File

@ -7,6 +7,7 @@
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
@ -63,12 +64,12 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
global_context->getFilesystemCacheLog());
/// We use `remove_fs_method` (not `local_fs_method`) because we are about to use
/// AsynchronousReadIndirectBufferFromRemoteFS which works by the remote_fs_* settings.
/// AsynchronousBoundedReadBuffer which works by the remote_fs_* settings.
if (modified_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, modified_settings, std::move(impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, modified_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -6,7 +6,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <IO/WriteBufferFromS3.h>
@ -127,8 +127,8 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(s3_impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(s3_impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -0,0 +1,14 @@
#include <Disks/ObjectStorages/StoredObject.h>
namespace DB
{
size_t getTotalSize(const StoredObjects & objects)
{
size_t size = 0;
for (const auto & object : objects)
size += object.bytes_size;
return size;
}
}

View File

@ -29,4 +29,6 @@ struct StoredObject
using StoredObjects = std::vector<StoredObject>;
size_t getTotalSize(const StoredObjects & objects);
}

View File

@ -9,6 +9,7 @@
#include <IO/WriteHelpers.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
@ -189,8 +190,8 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, read_settings, std::move(web_impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(web_impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -27,5 +27,6 @@ struct AsyncReadCounters
void dumpToMapColumn(IColumn * column) const;
};
using AsyncReadCountersPtr = std::shared_ptr<AsyncReadCounters>;
}

View File

@ -27,8 +27,6 @@ public:
ReadBuffer & getWrappedReadBuffer() { return *impl; }
bool isIntegratedWithFilesystemCache() const override { return impl->isIntegratedWithFilesystemCache(); }
size_t getFileSize() override;
protected:

View File

@ -49,8 +49,6 @@ public:
/// If true, setReadUntilPosition() guarantees that eof will be reported at the given position.
virtual bool supportsRightBoundedReads() const { return false; }
virtual bool isIntegratedWithFilesystemCache() const { return false; }
/// Returns true if seek() actually works, false if seek() will always throw (or make subsequent
/// nextImpl() calls throw).
///

View File

@ -1386,6 +1386,20 @@ void Context::addQueryAccessInfo(
query_access_info.views.emplace(view_name);
}
void Context::addQueryAccessInfo(const Names & partition_names)
{
if (isGlobalContext())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");
}
std::lock_guard<std::mutex> lock(query_access_info.mutex);
for (const auto & partition_name : partition_names)
{
query_access_info.partitions.emplace(partition_name);
}
}
void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const
{
if (isGlobalContext())
@ -2796,11 +2810,7 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
std::map<String, zkutil::ZooKeeperPtr> Context::getAuxiliaryZooKeepers() const
{
std::lock_guard lock(shared->auxiliary_zookeepers_mutex);
if (!shared->auxiliary_zookeepers.empty())
return shared->auxiliary_zookeepers;
else
return std::map<String, zkutil::ZooKeeperPtr>();
return shared->auxiliary_zookeepers;
}
#if USE_ROCKSDB
@ -4314,7 +4324,7 @@ Context::ParallelReplicasMode Context::getParallelReplicasMode() const
if (!settings_.parallel_replicas_custom_key.value.empty())
return CUSTOM_KEY;
if (settings_.allow_experimental_parallel_reading_from_replicas
if (settings_.allow_experimental_parallel_reading_from_replicas > 0
&& !settings_.use_hedged_requests)
return READ_TASKS;

View File

@ -297,6 +297,7 @@ private:
databases = rhs.databases;
tables = rhs.tables;
columns = rhs.columns;
partitions = rhs.partitions;
projections = rhs.projections;
views = rhs.views;
}
@ -314,6 +315,7 @@ private:
std::swap(databases, rhs.databases);
std::swap(tables, rhs.tables);
std::swap(columns, rhs.columns);
std::swap(partitions, rhs.partitions);
std::swap(projections, rhs.projections);
std::swap(views, rhs.views);
}
@ -323,6 +325,7 @@ private:
std::set<std::string> databases{};
std::set<std::string> tables{};
std::set<std::string> columns{};
std::set<std::string> partitions{};
std::set<std::string> projections{};
std::set<std::string> views{};
};
@ -631,6 +634,7 @@ public:
const Names & column_names,
const String & projection_name = {},
const String & view_name = {});
void addQueryAccessInfo(const Names & partition_names);
/// Supported factories for records in query_log

View File

@ -969,6 +969,15 @@ const ASTSelectQuery * ExpressionAnalyzer::getSelectQuery() const
return select_query;
}
bool ExpressionAnalyzer::isRemoteStorage() const
{
const Settings & csettings = getContext()->getSettingsRef();
// Consider any storage used in parallel replicas as remote, so the query is executed in multiple servers
const bool enable_parallel_processing_of_joins
= csettings.max_parallel_replicas > 1 && csettings.allow_experimental_parallel_reading_from_replicas > 0;
return syntax->is_remote_storage || enable_parallel_processing_of_joins;
}
const ASTSelectQuery * SelectQueryExpressionAnalyzer::getAggregatingQuery() const
{
if (!has_aggregation)

View File

@ -201,7 +201,7 @@ protected:
const ASTSelectQuery * getSelectQuery() const;
bool isRemoteStorage() const { return syntax->is_remote_storage; }
bool isRemoteStorage() const;
NamesAndTypesList getColumnsAfterArrayJoin(ActionsDAGPtr & actions, const NamesAndTypesList & src_columns);
NamesAndTypesList analyzeJoin(ActionsDAGPtr & actions, const NamesAndTypesList & src_columns);

View File

@ -45,4 +45,6 @@ public:
using SystemLog<FilesystemReadPrefetchesLogElement>::SystemLog;
};
using FilesystemReadPrefetchesLogPtr = std::shared_ptr<FilesystemReadPrefetchesLog>;
}

View File

@ -50,7 +50,16 @@ bool FillingRow::operator>=(const FillingRow & other) const
return !(*this < other);
}
bool FillingRow::next(const FillingRow & to_row)
bool FillingRow::isNull() const
{
for (const auto & field : row)
if (!field.isNull())
return false;
return true;
}
std::pair<bool, bool> FillingRow::next(const FillingRow & to_row)
{
const size_t row_size = size();
size_t pos = 0;
@ -61,22 +70,24 @@ bool FillingRow::next(const FillingRow & to_row)
break;
if (pos == row_size || less(to_row.row[pos], row[pos], getDirection(pos)))
return false;
return {false, false};
/// If we have any 'fill_to' value at position greater than 'pos',
/// we need to generate rows up to 'fill_to' value.
for (size_t i = row_size - 1; i > pos; --i)
{
if (getFillDescription(i).fill_to.isNull() || row[i].isNull())
auto & fill_column_desc = getFillDescription(i);
if (fill_column_desc.fill_to.isNull() || row[i].isNull())
continue;
auto next_value = row[i];
getFillDescription(i).step_func(next_value);
if (less(next_value, getFillDescription(i).fill_to, getDirection(i)))
Field next_value = row[i];
fill_column_desc.step_func(next_value);
if (less(next_value, fill_column_desc.fill_to, getDirection(i)))
{
row[i] = next_value;
initFromDefaults(i + 1);
return true;
return {true, true};
}
}
@ -84,14 +95,13 @@ bool FillingRow::next(const FillingRow & to_row)
getFillDescription(pos).step_func(next_value);
if (less(to_row.row[pos], next_value, getDirection(pos)) || equals(next_value, getFillDescription(pos).fill_to))
return false;
return {false, false};
row[pos] = next_value;
if (equals(row[pos], to_row.row[pos]))
{
bool is_less = false;
size_t i = pos + 1;
for (; i < row_size; ++i)
for (size_t i = pos + 1; i < row_size; ++i)
{
const auto & fill_from = getFillDescription(i).fill_from;
if (!fill_from.isNull())
@ -101,11 +111,11 @@ bool FillingRow::next(const FillingRow & to_row)
is_less |= less(row[i], to_row.row[i], getDirection(i));
}
return is_less;
return {is_less, true};
}
initFromDefaults(pos + 1);
return true;
return {true, true};
}
void FillingRow::initFromDefaults(size_t from_pos)

View File

@ -19,7 +19,10 @@ public:
explicit FillingRow(const SortDescription & sort_description);
/// Generates next row according to fill 'from', 'to' and 'step' values.
bool next(const FillingRow & to_row);
/// Return pair of boolean
/// apply - true if filling values should be inserted into result set
/// value_changed - true if filling row value was changed
std::pair<bool, bool> next(const FillingRow & to_row);
void initFromDefaults(size_t from_pos = 0);
@ -29,9 +32,11 @@ public:
bool operator<(const FillingRow & other) const;
bool operator==(const FillingRow & other) const;
bool operator>=(const FillingRow & other) const;
bool isNull() const;
int getDirection(size_t index) const { return sort_description[index].direction; }
FillColumnDescription & getFillDescription(size_t index) { return sort_description[index].fill_description; }
const FillColumnDescription & getFillDescription(size_t index) const { return sort_description[index].fill_description; }
String dump() const;

View File

@ -205,10 +205,19 @@ public:
}
private:
static bool shouldBeExecutedGlobally(const Data & data)
{
const Settings & settings = data.getContext()->getSettingsRef();
/// For parallel replicas we reinterpret JOIN as GLOBAL JOIN as a way to broadcast data
const bool enable_parallel_processing_of_joins = data.getContext()->canUseParallelReplicasOnInitiator();
return settings.prefer_global_in_and_join || enable_parallel_processing_of_joins;
}
/// GLOBAL IN
static void visit(ASTFunction & func, ASTPtr &, Data & data)
{
if ((data.getContext()->getSettingsRef().prefer_global_in_and_join
if ((shouldBeExecutedGlobally(data)
&& (func.name == "in" || func.name == "notIn" || func.name == "nullIn" || func.name == "notNullIn"))
|| func.name == "globalIn" || func.name == "globalNotIn" || func.name == "globalNullIn" || func.name == "globalNotNullIn")
{
@ -238,8 +247,7 @@ private:
static void visit(ASTTablesInSelectQueryElement & table_elem, ASTPtr &, Data & data)
{
if (table_elem.table_join
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global
|| data.getContext()->getSettingsRef().prefer_global_in_and_join))
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global || shouldBeExecutedGlobally(data)))
{
data.addExternalStorage(table_elem.table_expression, true);
data.has_global_subqueries = true;

View File

@ -116,6 +116,7 @@ namespace ErrorCodes
extern const int ACCESS_DENIED;
extern const int UNKNOWN_IDENTIFIER;
extern const int BAD_ARGUMENTS;
extern const int SUPPORT_IS_DISABLED;
}
/// Assumes `storage` is set and the table filter (row-level security) is not empty.
@ -385,6 +386,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.ignore_projections = options.ignore_projections;
query_info.is_projection_query = options.is_projection_query;
query_info.is_internal = options.is_internal;
initSettings();
const Settings & settings = context->getSettingsRef();
@ -408,6 +410,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
ApplyWithSubqueryVisitor().visit(query_ptr);
}
query_info.query = query_ptr->clone();
query_info.original_query = query_ptr->clone();
if (settings.count_distinct_optimization)
@ -455,25 +458,42 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
if (joined_tables.tablesCount() > 1 && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
/// Check support for JOIN for parallel replicas with custom key
if (joined_tables.tablesCount() > 1 && !settings.parallel_replicas_custom_key.value.empty())
{
LOG_WARNING(log, "Joins are not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
LOG_WARNING(log, "JOINs are not supported with parallel_replicas_custom_key. Query will be executed without using them.");
context->setSetting("parallel_replicas_custom_key", String{""});
}
/// Try to execute query without parallel replicas if we find that there is a FINAL modifier there.
bool is_query_with_final = false;
if (query_info.table_expression_modifiers)
is_query_with_final = query_info.table_expression_modifiers->hasFinal();
else if (query_info.query)
is_query_with_final = query_info.query->as<ASTSelectQuery &>().final();
if (is_query_with_final && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
/// Check support for FINAL for parallel replicas
bool is_query_with_final = isQueryWithFinal(query_info);
if (is_query_with_final && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
{
LOG_WARNING(log, "FINAL modifier is supported with parallel replicas. Will try to execute the query without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
context->setSetting("parallel_replicas_custom_key", String{""});
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(log, "FINAL modifier is not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("parallel_replicas_custom_key", String{""});
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");
}
}
/// Check support for parallel replicas for non-replicated storage (plain MergeTree)
bool is_plain_merge_tree = storage && storage->isMergeTree() && !storage->supportsReplication();
if (is_plain_merge_tree && settings.allow_experimental_parallel_reading_from_replicas > 0 && !settings.parallel_replicas_for_non_replicated_merge_tree)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(log, "To use parallel replicas with plain MergeTree tables please enable setting `parallel_replicas_for_non_replicated_merge_tree`. For now query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "To use parallel replicas with plain MergeTree tables please enable setting `parallel_replicas_for_non_replicated_merge_tree`");
}
}
/// Rewrite JOINs
@ -2994,20 +3014,27 @@ void InterpreterSelectQuery::executeWithFill(QueryPlan & query_plan)
auto & query = getSelectQuery();
if (query.orderBy())
{
SortDescription order_descr = getSortDescription(query, context);
SortDescription fill_descr;
for (auto & desc : order_descr)
SortDescription sort_description = getSortDescription(query, context);
SortDescription fill_description;
for (auto & desc : sort_description)
{
if (desc.with_fill)
fill_descr.push_back(desc);
fill_description.push_back(desc);
}
if (fill_descr.empty())
if (fill_description.empty())
return;
InterpolateDescriptionPtr interpolate_descr =
getInterpolateDescription(query, source_header, result_header, syntax_analyzer_result->aliases, context);
auto filling_step = std::make_unique<FillingStep>(query_plan.getCurrentDataStream(), std::move(fill_descr), interpolate_descr);
const Settings & settings = context->getSettingsRef();
auto filling_step = std::make_unique<FillingStep>(
query_plan.getCurrentDataStream(),
std::move(sort_description),
std::move(fill_description),
interpolate_descr,
settings.use_with_fill_by_sorting_prefix);
query_plan.addStep(std::move(filling_step));
}
}
@ -3126,4 +3153,14 @@ void InterpreterSelectQuery::initSettings()
}
}
bool InterpreterSelectQuery::isQueryWithFinal(const SelectQueryInfo & info)
{
bool result = info.query->as<ASTSelectQuery &>().final();
if (info.table_expression_modifiers)
result |= info.table_expression_modifiers->hasFinal();
return result;
}
}

View File

@ -131,6 +131,8 @@ public:
static SortDescription getSortDescription(const ASTSelectQuery & query, const ContextPtr & context);
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const ContextPtr & context);
static bool isQueryWithFinal(const SelectQueryInfo & info);
private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,

View File

@ -0,0 +1,121 @@
#include <Interpreters/OptimizeDateFilterVisitor.h>
#include <Common/DateLUT.h>
#include <Common/DateLUTImpl.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
namespace DB
{
ASTPtr generateOptimizedDateFilterAST(const String & comparator, const String & converter, const String & column, UInt64 year)
{
const DateLUTImpl & date_lut = DateLUT::instance();
if (converter != "toYear") return {};
String start_date = date_lut.dateToString(date_lut.makeDayNum(year, 1, 1));
String end_date = date_lut.dateToString(date_lut.makeDayNum(year, 12, 31));
if (comparator == "equals")
{
return makeASTFunction("and",
makeASTFunction("greaterOrEquals",
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(start_date)
),
makeASTFunction("lessOrEquals",
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(end_date)
)
);
}
else if (comparator == "notEquals")
{
return makeASTFunction("or",
makeASTFunction("less",
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(start_date)
),
makeASTFunction("greater",
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(end_date)
)
);
}
else if (comparator == "less" || comparator == "greaterOrEquals")
{
return makeASTFunction(comparator,
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(start_date)
);
}
else
{
return makeASTFunction(comparator,
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(end_date)
);
}
}
bool rewritePredicateInPlace(ASTFunction & function, ASTPtr & ast)
{
const static std::unordered_map<String, String> swap_relations = {
{"equals", "equals"},
{"notEquals", "notEquals"},
{"less", "greater"},
{"greater", "less"},
{"lessOrEquals", "greaterOrEquals"},
{"greaterOrEquals", "lessOrEquals"},
};
if (!swap_relations.contains(function.name)) return false;
if (!function.arguments || function.arguments->children.size() != 2) return false;
size_t func_id = function.arguments->children.size();
for (size_t i = 0; i < function.arguments->children.size(); i++)
{
if (const auto * func = function.arguments->children[i]->as<ASTFunction>(); func)
{
if (func->name == "toYear")
{
func_id = i;
}
}
}
if (func_id == function.arguments->children.size()) return false;
size_t literal_id = 1 - func_id;
const auto * literal = function.arguments->children[literal_id]->as<ASTLiteral>();
if (!literal || literal->value.getType() != Field::Types::UInt64) return false;
UInt64 compare_to = literal->value.get<UInt64>();
String comparator = literal_id > func_id ? function.name : swap_relations.at(function.name);
const auto * func = function.arguments->children[func_id]->as<ASTFunction>();
const auto * column_id = func->arguments->children.at(0)->as<ASTIdentifier>();
if (!column_id) return false;
String column = column_id->name();
const auto new_ast = generateOptimizedDateFilterAST(comparator, func->name, column, compare_to);
if (!new_ast) return false;
ast = new_ast;
return true;
}
void OptimizeDateFilterInPlaceData::visit(ASTFunction & function, ASTPtr & ast) const
{
rewritePredicateInPlace(function, ast);
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
class ASTFunction;
/// Rewrite the predicates in place
class OptimizeDateFilterInPlaceData
{
public:
using TypeToVisit = ASTFunction;
void visit(ASTFunction & function, ASTPtr & ast) const;
};
using OptimizeDateFilterInPlaceMatcher = OneTypeMatcher<OptimizeDateFilterInPlaceData>;
using OptimizeDateFilterInPlaceVisitor = InDepthNodeVisitor<OptimizeDateFilterInPlaceMatcher, true>;
}

View File

@ -29,6 +29,7 @@ NamesAndTypesList ProcessorProfileLogElement::getNamesAndTypes()
{"plan_step", std::make_shared<DataTypeUInt64>()},
{"plan_group", std::make_shared<DataTypeUInt64>()},
{"initial_query_id", std::make_shared<DataTypeString>()},
{"query_id", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"elapsed_us", std::make_shared<DataTypeUInt64>()},
@ -60,6 +61,7 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(plan_step);
columns[i++]->insert(plan_group);
columns[i++]->insertData(initial_query_id.data(), initial_query_id.size());
columns[i++]->insertData(query_id.data(), query_id.size());
columns[i++]->insertData(processor_name.data(), processor_name.size());
columns[i++]->insert(elapsed_us);

View File

@ -19,6 +19,7 @@ struct ProcessorProfileLogElement
UInt64 plan_step{};
UInt64 plan_group{};
String initial_query_id;
String query_id;
String processor_name;

View File

@ -70,6 +70,7 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes()
{"databases", array_low_cardinality_string},
{"tables", array_low_cardinality_string},
{"columns", array_low_cardinality_string},
{"partitions", array_low_cardinality_string},
{"projections", array_low_cardinality_string},
{"views", array_low_cardinality_string},
{"exception_code", std::make_shared<DataTypeInt32>()},
@ -176,6 +177,7 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
auto & column_databases = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_tables = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_columns = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_partitions = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_projections = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_views = typeid_cast<ColumnArray &>(*columns[i++]);
@ -194,6 +196,7 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
fill_column(query_databases, column_databases);
fill_column(query_tables, column_tables);
fill_column(query_columns, column_columns);
fill_column(query_partitions, column_partitions);
fill_column(query_projections, column_projections);
fill_column(query_views, column_views);
}

View File

@ -65,6 +65,7 @@ struct QueryLogElement
std::set<String> query_databases;
std::set<String> query_tables;
std::set<String> query_columns;
std::set<String> query_partitions;
std::set<String> query_projections;
std::set<String> query_views;

View File

@ -25,6 +25,7 @@
#include <Interpreters/GatherFunctionQuantileVisitor.h>
#include <Interpreters/RewriteSumIfFunctionVisitor.h>
#include <Interpreters/RewriteArrayExistsFunctionVisitor.h>
#include <Interpreters/OptimizeDateFilterVisitor.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
@ -677,6 +678,21 @@ void optimizeInjectiveFunctionsInsideUniq(ASTPtr & query, ContextPtr context)
RemoveInjectiveFunctionsVisitor(data).visit(query);
}
void optimizeDateFilters(ASTSelectQuery * select_query)
{
/// Predicates in HAVING clause has been moved to WHERE clause.
if (select_query->where())
{
OptimizeDateFilterInPlaceVisitor::Data data;
OptimizeDateFilterInPlaceVisitor(data).visit(select_query->refWhere());
}
if (select_query->prewhere())
{
OptimizeDateFilterInPlaceVisitor::Data data;
OptimizeDateFilterInPlaceVisitor(data).visit(select_query->refPrewhere());
}
}
void transformIfStringsIntoEnum(ASTPtr & query)
{
std::unordered_set<String> function_names = {"if", "transform"};
@ -780,6 +796,9 @@ void TreeOptimizer::apply(ASTPtr & query, TreeRewriterResult & result,
tables_with_columns, result.storage_snapshot->metadata, result.storage);
}
/// Rewrite date filters to avoid the calls of converters such as toYear, toYYYYMM, toISOWeek, etc.
optimizeDateFilters(select_query);
/// GROUP BY injective function elimination.
optimizeGroupBy(select_query, context);

View File

@ -837,6 +837,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.query_databases = info.databases;
elem.query_tables = info.tables;
elem.query_columns = info.columns;
elem.query_partitions = info.partitions;
elem.query_projections = info.projections;
elem.query_views = info.views;
}
@ -901,6 +902,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
element.query_databases.insert(access_info.databases.begin(), access_info.databases.end());
element.query_tables.insert(access_info.tables.begin(), access_info.tables.end());
element.query_columns.insert(access_info.columns.begin(), access_info.columns.end());
element.query_partitions.insert(access_info.partitions.begin(), access_info.partitions.end());
element.query_projections.insert(access_info.projections.begin(), access_info.projections.end());
element.query_views.insert(access_info.views.begin(), access_info.views.end());
@ -1003,6 +1005,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = elem.event_time;
processor_elem.event_time_microseconds = elem.event_time_microseconds;
processor_elem.initial_query_id = elem.client_info.initial_query_id;
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64

View File

@ -112,8 +112,6 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
subquery_options.removeDuplicates();
}
/// We don't want to execute reading for subqueries in parallel
subquery_context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
return std::make_shared<InterpreterSelectWithUnionQuery>(query, subquery_context, subquery_options, required_source_columns);
}

View File

@ -83,6 +83,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int TOO_DEEP_SUBQUERIES;
extern const int NOT_IMPLEMENTED;
extern const int SUPPORT_IS_DISABLED;
}
/** ClickHouse query planner.
@ -622,7 +623,14 @@ void addWithFillStepIfNeeded(QueryPlan & query_plan,
interpolate_description = std::make_shared<InterpolateDescription>(std::move(interpolate_actions_dag), empty_aliases);
}
auto filling_step = std::make_unique<FillingStep>(query_plan.getCurrentDataStream(), std::move(fill_description), interpolate_description);
const auto & query_context = planner_context->getQueryContext();
const Settings & settings = query_context->getSettingsRef();
auto filling_step = std::make_unique<FillingStep>(
query_plan.getCurrentDataStream(),
sort_description,
std::move(fill_description),
interpolate_description,
settings.use_with_fill_by_sorting_prefix);
query_plan.addStep(std::move(filling_step));
}
@ -1185,16 +1193,25 @@ void Planner::buildPlanForQueryNode()
const auto & settings = query_context->getSettingsRef();
if (planner_context->getTableExpressionNodeToData().size() > 1
&& (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
&& (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
{
LOG_WARNING(
&Poco::Logger::get("Planner"), "Joins are not supported with parallel replicas. Query will be executed without using them.");
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(
&Poco::Logger::get("Planner"), "JOINs are not supported with parallel replicas. Query will be executed without using them.");
auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
mutable_context->setSetting("parallel_replicas_custom_key", String{""});
auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
mutable_context->setSetting("parallel_replicas_custom_key", String{""});
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOINs are not supported with parallel replicas");
}
}
/// TODO: Also disable parallel replicas in case of FINAL
auto top_level_identifiers = collectTopLevelColumnIdentifiers(query_tree, planner_context);
auto join_tree_query_plan = buildJoinTreeQueryPlan(query_tree,
select_query_info,
@ -1432,7 +1449,8 @@ void Planner::buildPlanForQueryNode()
addLimitByStep(query_plan, limit_by_analysis_result, query_node);
}
addWithFillStepIfNeeded(query_plan, query_analysis_result, planner_context, query_node);
if (query_node.hasOrderBy())
addWithFillStepIfNeeded(query_plan, query_analysis_result, planner_context, query_node);
bool apply_offset = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;

View File

@ -27,9 +27,17 @@ static ITransformingStep::Traits getTraits()
};
}
FillingStep::FillingStep(const DataStream & input_stream_, SortDescription sort_description_, InterpolateDescriptionPtr interpolate_description_)
FillingStep::FillingStep(
const DataStream & input_stream_,
SortDescription sort_description_,
SortDescription fill_description_,
InterpolateDescriptionPtr interpolate_description_,
bool use_with_fill_by_sorting_prefix_)
: ITransformingStep(input_stream_, FillingTransform::transformHeader(input_stream_.header, sort_description_), getTraits())
, sort_description(std::move(sort_description_)), interpolate_description(interpolate_description_)
, sort_description(std::move(sort_description_))
, fill_description(std::move(fill_description_))
, interpolate_description(interpolate_description_)
, use_with_fill_by_sorting_prefix(use_with_fill_by_sorting_prefix_)
{
if (!input_stream_.has_single_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
@ -40,9 +48,10 @@ void FillingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return std::make_shared<FillingNoopTransform>(header, sort_description);
return std::make_shared<FillingNoopTransform>(header, fill_description);
return std::make_shared<FillingTransform>(header, sort_description, std::move(interpolate_description));
return std::make_shared<FillingTransform>(
header, sort_description, fill_description, std::move(interpolate_description), use_with_fill_by_sorting_prefix);
});
}

View File

@ -10,7 +10,12 @@ namespace DB
class FillingStep : public ITransformingStep
{
public:
FillingStep(const DataStream & input_stream_, SortDescription sort_description_, InterpolateDescriptionPtr interpolate_description_);
FillingStep(
const DataStream & input_stream_,
SortDescription sort_description_,
SortDescription fill_description_,
InterpolateDescriptionPtr interpolate_description_,
bool use_with_fill_by_sorting_prefix);
String getName() const override { return "Filling"; }
@ -25,7 +30,9 @@ private:
void updateOutputStream() override;
SortDescription sort_description;
SortDescription fill_description;
InterpolateDescriptionPtr interpolate_description;
const bool use_with_fill_by_sorting_prefix;
};
}

View File

@ -3,6 +3,7 @@
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -99,7 +100,6 @@ namespace ErrorCodes
extern const int INDEX_NOT_USED;
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_ROWS;
extern const int SUPPORT_IS_DISABLED;
}
static MergeTreeReaderSettings getMergeTreeReaderSettings(
@ -1314,7 +1314,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
auto reader_settings = getMergeTreeReaderSettings(context, query_info);
bool use_skip_indexes = settings.use_skip_indexes;
bool final = isFinal(query_info);
bool final = InterpreterSelectQuery::isQueryWithFinal(query_info);
if (final && !settings.use_skip_indexes_if_final)
use_skip_indexes = false;
@ -1377,7 +1377,7 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
/// Disable read-in-order optimization for reverse order with final.
/// Otherwise, it can lead to incorrect final behavior because the implementation may rely on the reading in direct order).
if (direction != 1 && isFinal(query_info))
if (direction != 1 && isQueryWithFinal())
return false;
auto order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, limit);
@ -1500,11 +1500,7 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
bool ReadFromMergeTree::isQueryWithFinal() const
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
if (query_info.table_expression_modifiers)
return query_info.table_expression_modifiers->hasFinal();
else
return select.final();
return InterpreterSelectQuery::isQueryWithFinal(query_info);
}
bool ReadFromMergeTree::isQueryWithSampling() const
@ -1522,7 +1518,7 @@ bool ReadFromMergeTree::isQueryWithSampling() const
Pipe ReadFromMergeTree::spreadMarkRanges(
RangesInDataParts && parts_with_ranges, size_t num_streams, AnalysisResult & result, ActionsDAGPtr & result_projection)
{
bool final = isQueryWithFinal();
const bool final = isQueryWithFinal();
const auto & input_order_info = query_info.getInputOrderInfo();
Names column_names_to_read = result.column_names_to_read;
@ -1539,8 +1535,7 @@ Pipe ReadFromMergeTree::spreadMarkRanges(
if (final)
{
if (is_parallel_reading_from_replicas)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");
chassert(!is_parallel_reading_from_replicas);
if (output_each_partition_through_separate_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimisation isn't supposed to be used for queries with final");
@ -1618,6 +1613,18 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
result.selected_marks,
result.selected_ranges);
// Adding partition info to QueryAccessInfo.
if (context->hasQueryContext() && !query_info.is_internal)
{
Names partition_names;
for (const auto & part : result.parts_with_ranges)
{
partition_names.emplace_back(
fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id));
}
context->getQueryContext()->addQueryAccessInfo(partition_names);
}
ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);
ProfileEvents::increment(ProfileEvents::SelectedRanges, result.selected_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, result.selected_marks);
@ -1948,15 +1955,6 @@ void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const
}
}
bool ReadFromMergeTree::isFinal(const SelectQueryInfo & query_info)
{
if (query_info.table_expression_modifiers)
return query_info.table_expression_modifiers->hasFinal();
const auto & select = query_info.query->as<ASTSelectQuery &>();
return select.final();
}
bool MergeTreeDataSelectAnalysisResult::error() const
{
return std::holds_alternative<std::exception_ptr>(result);

View File

@ -159,7 +159,6 @@ public:
void updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value);
static bool isFinal(const SelectQueryInfo & query_info);
bool isQueryWithFinal() const;
bool isQueryWithSampling() const;

View File

@ -187,25 +187,31 @@ static bool tryConvertFields(FillColumnDescription & descr, const DataTypePtr &
}
FillingTransform::FillingTransform(
const Block & header_, const SortDescription & sort_description_, InterpolateDescriptionPtr interpolate_description_)
: ISimpleTransform(header_, transformHeader(header_, sort_description_), true)
, sort_description(sort_description_)
, interpolate_description(interpolate_description_)
, filling_row(sort_description_)
, next_row(sort_description_)
const Block & header_,
const SortDescription & sort_description_,
const SortDescription & fill_description_,
InterpolateDescriptionPtr interpolate_description_,
const bool use_with_fill_by_sorting_prefix_)
: ISimpleTransform(header_, transformHeader(header_, fill_description_), true)
, sort_description(sort_description_)
, fill_description(fill_description_)
, interpolate_description(interpolate_description_)
, filling_row(fill_description_)
, next_row(fill_description_)
, use_with_fill_by_sorting_prefix(use_with_fill_by_sorting_prefix_)
{
if (interpolate_description)
interpolate_actions = std::make_shared<ExpressionActions>(interpolate_description->actions);
std::vector<bool> is_fill_column(header_.columns());
for (size_t i = 0, size = sort_description.size(); i < size; ++i)
for (size_t i = 0, size = fill_description.size(); i < size; ++i)
{
if (interpolate_description && interpolate_description->result_columns_set.contains(sort_description[i].column_name))
if (interpolate_description && interpolate_description->result_columns_set.contains(fill_description[i].column_name))
throw Exception(ErrorCodes::INVALID_WITH_FILL_EXPRESSION,
"Column '{}' is participating in ORDER BY ... WITH FILL expression and can't be INTERPOLATE output",
sort_description[i].column_name);
fill_description[i].column_name);
size_t block_position = header_.getPositionByName(sort_description[i].column_name);
size_t block_position = header_.getPositionByName(fill_description[i].column_name);
is_fill_column[block_position] = true;
fill_column_positions.push_back(block_position);
@ -226,21 +232,40 @@ FillingTransform::FillingTransform(
"WITH FILL bound values cannot be negative for unsigned type {}", type->getName());
}
}
logDebug("fill description", dumpSortDescription(fill_description));
std::set<size_t> unique_positions;
for (auto pos : fill_column_positions)
if (!unique_positions.insert(pos).second)
throw Exception(ErrorCodes::INVALID_WITH_FILL_EXPRESSION, "Multiple WITH FILL for identical expressions is not supported in ORDER BY");
if (use_with_fill_by_sorting_prefix)
{
/// build sorting prefix for first fill column
for (const auto & desc : sort_description)
{
if (desc.column_name == fill_description[0].column_name)
break;
size_t pos = header_.getPositionByName(desc.column_name);
sort_prefix_positions.push_back(pos);
sort_prefix.push_back(desc);
}
logDebug("sort prefix", dumpSortDescription(sort_prefix));
last_range_sort_prefix.reserve(sort_prefix.size());
}
size_t idx = 0;
for (const ColumnWithTypeAndName & column : header_.getColumnsWithTypeAndName())
{
if (interpolate_description)
if (const auto & p = interpolate_description->required_columns_map.find(column.name);
p != interpolate_description->required_columns_map.end())
input_positions.emplace_back(idx, p->second);
input_positions.emplace_back(idx, p->second);
if (!is_fill_column[idx] && !(interpolate_description && interpolate_description->result_columns_set.contains(column.name)))
if (!is_fill_column[idx] && !(interpolate_description && interpolate_description->result_columns_set.contains(column.name))
&& sort_prefix_positions.end() == std::find(sort_prefix_positions.begin(), sort_prefix_positions.end(), idx))
other_column_positions.push_back(idx);
++idx;
@ -249,6 +274,20 @@ FillingTransform::FillingTransform(
if (interpolate_description)
for (const auto & name : interpolate_description->result_columns_order)
interpolate_column_positions.push_back(header_.getPositionByName(name));
/// check conflict in positions between interpolate and sorting prefix columns
if (!sort_prefix_positions.empty() && !interpolate_column_positions.empty())
{
std::unordered_set<size_t> interpolate_positions(interpolate_column_positions.begin(), interpolate_column_positions.end());
for (auto sort_prefix_pos : sort_prefix_positions)
{
if (interpolate_positions.contains(sort_prefix_pos))
throw Exception(
ErrorCodes::INVALID_WITH_FILL_EXPRESSION,
"The same column in ORDER BY before WITH FILL (sorting prefix) and INTERPOLATE is not allowed. Column: {}",
(header_.begin() + sort_prefix_pos)->name);
}
}
}
/// prepare() is overrididen to call transform() after all chunks are processed
@ -313,9 +352,14 @@ void FillingTransform::interpolate(const MutableColumns & result_columns, Block
using MutableColumnRawPtrs = std::vector<IColumn*>;
static void insertFromFillingRow(const MutableColumnRawPtrs & filling_columns, const MutableColumnRawPtrs & interpolate_columns, const MutableColumnRawPtrs & other_columns,
const FillingRow & filling_row, const Block & interpolate_block)
void FillingTransform::insertFromFillingRow(
const MutableColumnRawPtrs & filling_columns,
const MutableColumnRawPtrs & interpolate_columns,
const MutableColumnRawPtrs & other_columns,
const Block & interpolate_block)
{
logDebug("insertFromFillingRow", filling_row);
for (size_t i = 0, size = filling_columns.size(); i < size; ++i)
{
if (filling_row[i].isNull())
@ -338,10 +382,14 @@ static void insertFromFillingRow(const MutableColumnRawPtrs & filling_columns, c
for (auto * other_column : other_columns)
other_column->insertDefault();
filling_row_inserted = true;
}
static void copyRowFromColumns(const MutableColumnRawPtrs & dest, const Columns & source, size_t row_num)
{
chassert(dest.size() == source.size());
for (size_t i = 0, size = source.size(); i < size; ++i)
dest[i]->insertFrom(*source[i], row_num);
}
@ -353,7 +401,7 @@ static void initColumnsByPositions(
MutableColumnRawPtrs & output_columns_by_position,
const std::vector<size_t> & positions)
{
for (size_t pos : positions)
for (const size_t pos : positions)
{
input_columns_by_positions.push_back(input_columns[pos]);
output_columns_by_position.push_back(output_columns[pos].get());
@ -364,10 +412,12 @@ void FillingTransform::initColumns(
const Columns & input_columns,
Columns & input_fill_columns,
Columns & input_interpolate_columns,
Columns & input_sort_prefix_columns,
Columns & input_other_columns,
MutableColumns & output_columns,
MutableColumnRawPtrs & output_fill_columns,
MutableColumnRawPtrs & output_interpolate_columns,
MutableColumnRawPtrs & output_sort_prefix_columns,
MutableColumnRawPtrs & output_other_columns)
{
Columns non_const_columns;
@ -382,65 +432,236 @@ void FillingTransform::initColumns(
initColumnsByPositions(non_const_columns, input_fill_columns, output_columns, output_fill_columns, fill_column_positions);
initColumnsByPositions(
non_const_columns, input_interpolate_columns, output_columns, output_interpolate_columns, interpolate_column_positions);
initColumnsByPositions(non_const_columns, input_sort_prefix_columns, output_columns, output_sort_prefix_columns, sort_prefix_positions);
initColumnsByPositions(non_const_columns, input_other_columns, output_columns, output_other_columns, other_column_positions);
}
bool FillingTransform::generateSuffixIfNeeded(const Columns & input_columns, MutableColumns & result_columns)
{
logDebug("generateSuffixIfNeeded() filling_row", filling_row);
logDebug("generateSuffixIfNeeded() next_row", next_row);
logDebug("generateSuffixIfNeeded() first", first);
/// Determines should we insert filling row before start generating next rows.
bool should_insert_first = next_row < filling_row || first;
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;
logDebug("generateSuffixIfNeeded() next_row updated", next_row);
if (!first && filling_row >= next_row)
{
logDebug("generateSuffixIfNeeded()", "no need to generate suffix");
return false;
}
Columns input_fill_columns;
Columns input_interpolate_columns;
Columns input_sort_prefix_columns;
Columns input_other_columns;
MutableColumnRawPtrs res_fill_columns;
MutableColumnRawPtrs res_interpolate_columns;
MutableColumnRawPtrs res_sort_prefix_columns;
MutableColumnRawPtrs res_other_columns;
initColumns(
input_columns,
input_fill_columns,
input_interpolate_columns,
input_sort_prefix_columns,
input_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
res_sort_prefix_columns,
res_other_columns);
if (first)
filling_row.initFromDefaults();
return generateSuffixIfNeeded(result_columns, res_fill_columns, res_interpolate_columns, res_sort_prefix_columns, res_other_columns);
}
bool FillingTransform::generateSuffixIfNeeded(
const MutableColumns & result_columns,
MutableColumnRawPtrs res_fill_columns,
MutableColumnRawPtrs res_interpolate_columns,
MutableColumnRawPtrs res_sort_prefix_columns,
MutableColumnRawPtrs res_other_columns)
{
logDebug("generateSuffixIfNeeded() filling_row", filling_row);
logDebug("generateSuffixIfNeeded() next_row", next_row);
/// Determines if we should insert filling row before start generating next rows
bool should_insert_first = (next_row < filling_row && !filling_row_inserted) || next_row.isNull();
logDebug("should_insert_first", should_insert_first);
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;
logDebug("generateSuffixIfNeeded() next_row updated", next_row);
if (filling_row >= next_row)
{
logDebug("generateSuffixIfNeeded()", "no need to generate suffix");
return false;
}
Block interpolate_block;
if (should_insert_first && filling_row < next_row)
{
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, interpolate_block);
/// fulfill sort prefix columns with last row values or defaults
if (!last_range_sort_prefix.empty())
copyRowFromColumns(res_sort_prefix_columns, last_range_sort_prefix, 0);
else
for (auto * sort_prefix_column : res_sort_prefix_columns)
sort_prefix_column->insertDefault();
}
while (filling_row.next(next_row))
bool filling_row_changed = false;
while (true)
{
const auto [apply, changed] = filling_row.next(next_row);
filling_row_changed = changed;
if (!apply)
break;
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, interpolate_block);
/// fulfill sort prefix columns with last row values or defaults
if (!last_range_sort_prefix.empty())
copyRowFromColumns(res_sort_prefix_columns, last_range_sort_prefix, 0);
else
for (auto * sort_prefix_column : res_sort_prefix_columns)
sort_prefix_column->insertDefault();
}
/// new valid filling row was generated but not inserted
if (filling_row_changed)
filling_row_inserted = false;
return true;
}
template <typename Predicate>
size_t getRangeEnd(size_t begin, size_t end, Predicate pred)
{
chassert(begin < end);
const size_t linear_probe_threadhold = 16;
size_t linear_probe_end = begin + linear_probe_threadhold;
if (linear_probe_end > end)
linear_probe_end = end;
for (size_t pos = begin; pos < linear_probe_end; ++pos)
{
if (!pred(begin, pos))
return pos;
}
size_t low = linear_probe_end;
size_t high = end - 1;
while (low <= high)
{
size_t mid = low + (high - low) / 2;
if (pred(begin, mid))
low = mid + 1;
else
{
high = mid - 1;
end = mid;
}
}
return end;
}
void FillingTransform::transformRange(
const Columns & input_fill_columns,
const Columns & input_interpolate_columns,
const Columns & input_sort_prefix_columns,
const Columns & input_other_columns,
const MutableColumns & result_columns,
const MutableColumnRawPtrs & res_fill_columns,
const MutableColumnRawPtrs & res_interpolate_columns,
const MutableColumnRawPtrs & res_sort_prefix_columns,
const MutableColumnRawPtrs & res_other_columns,
std::pair<size_t, size_t> range,
const bool new_sorting_prefix)
{
const size_t range_begin = range.first;
const size_t range_end = range.second;
Block interpolate_block;
if (new_sorting_prefix)
{
logDebug("--- new range ---", range_end);
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
{
const auto current_value = (*input_fill_columns[i])[range_begin];
const auto & fill_from = filling_row.getFillDescription(i).fill_from;
if (!fill_from.isNull() && !equals(current_value, fill_from))
{
filling_row.initFromDefaults(i);
filling_row_inserted = false;
if (less(fill_from, current_value, filling_row.getDirection(i)))
{
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, interpolate_block);
copyRowFromColumns(res_sort_prefix_columns, input_sort_prefix_columns, range_begin);
}
break;
}
filling_row[i] = current_value;
}
}
for (size_t row_ind = range_begin; row_ind < range_end; ++row_ind)
{
logDebug("row", row_ind);
logDebug("filling_row", filling_row);
logDebug("next_row", next_row);
bool should_insert_first = next_row < filling_row;
logDebug("should_insert_first", should_insert_first);
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
{
const auto current_value = (*input_fill_columns[i])[row_ind];
const auto & fill_to = filling_row.getFillDescription(i).fill_to;
if (fill_to.isNull() || less(current_value, fill_to, filling_row.getDirection(i)))
next_row[i] = current_value;
else
next_row[i] = fill_to;
}
logDebug("next_row updated", next_row);
/// The condition is true when filling row is initialized by value(s) in FILL FROM,
/// and there are row(s) in current range with value(s) < then in the filling row.
/// It can happen only once for a range.
if (should_insert_first && filling_row < next_row)
{
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, interpolate_block);
copyRowFromColumns(res_sort_prefix_columns, input_sort_prefix_columns, row_ind);
}
bool filling_row_changed = false;
while (true)
{
const auto [apply, changed] = filling_row.next(next_row);
filling_row_changed = changed;
if (!apply)
break;
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, interpolate_block);
copyRowFromColumns(res_sort_prefix_columns, input_sort_prefix_columns, row_ind);
}
/// new valid filling row was generated but not inserted, will use it during suffix generation
if (filling_row_changed)
filling_row_inserted = false;
logDebug("filling_row after", filling_row);
copyRowFromColumns(res_fill_columns, input_fill_columns, row_ind);
copyRowFromColumns(res_interpolate_columns, input_interpolate_columns, row_ind);
copyRowFromColumns(res_sort_prefix_columns, input_sort_prefix_columns, row_ind);
copyRowFromColumns(res_other_columns, input_other_columns, row_ind);
}
/// save sort prefix of last row in the range, it's used to generate suffix
last_range_sort_prefix.clear();
for (const auto & sort_prefix_column : input_sort_prefix_columns)
{
auto column = sort_prefix_column->cloneEmpty();
column->insertFrom(*sort_prefix_column, range_end - 1);
last_range_sort_prefix.push_back(std::move(column));
}
}
void FillingTransform::transform(Chunk & chunk)
{
logDebug("new chunk rows", chunk.getNumRows());
@ -453,9 +674,11 @@ void FillingTransform::transform(Chunk & chunk)
Columns input_fill_columns;
Columns input_interpolate_columns;
Columns input_sort_prefix_columns;
Columns input_other_columns;
MutableColumnRawPtrs res_fill_columns;
MutableColumnRawPtrs res_interpolate_columns;
MutableColumnRawPtrs res_sort_prefix_columns;
MutableColumnRawPtrs res_other_columns;
MutableColumns result_columns;
@ -468,6 +691,14 @@ void FillingTransform::transform(Chunk & chunk)
/// if all chunks are processed, then we may need to generate suffix for the following cases:
/// (1) when all data are processed and WITH FILL .. TO is provided
/// (2) for empty result set when WITH FILL FROM .. TO is provided (see PR #30888)
/// if no data was processed, then need to initialize filling_row
if (last_row.empty())
{
filling_row.initFromDefaults();
filling_row_inserted = false;
}
if (generateSuffixIfNeeded(input.getHeader().getColumns(), result_columns))
{
size_t num_output_rows = result_columns[0]->size();
@ -485,72 +716,95 @@ void FillingTransform::transform(Chunk & chunk)
input_columns,
input_fill_columns,
input_interpolate_columns,
input_sort_prefix_columns,
input_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
res_sort_prefix_columns,
res_other_columns);
if (first)
if (sort_prefix.empty() || !use_with_fill_by_sorting_prefix)
{
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
{
auto current_value = (*input_fill_columns[i])[0];
const auto & fill_from = filling_row.getFillDescription(i).fill_from;
transformRange(
input_fill_columns,
input_interpolate_columns,
input_sort_prefix_columns,
input_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
res_sort_prefix_columns,
res_other_columns,
{0, num_rows},
last_row.empty());
if (!fill_from.isNull() && !equals(current_value, fill_from))
{
filling_row.initFromDefaults(i);
if (less(fill_from, current_value, filling_row.getDirection(i)))
{
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}
break;
}
filling_row[i] = current_value;
}
first = false;
saveLastRow(result_columns);
size_t num_output_rows = result_columns[0]->size();
chunk.setColumns(std::move(result_columns), num_output_rows);
return;
}
for (size_t row_ind = 0; row_ind < num_rows; ++row_ind)
/// check if last row in prev chunk had the same sorting prefix as the first in new one
/// if not, we need to reinitialize filling row
bool new_sort_prefix = last_row.empty();
if (!last_row.empty())
{
logDebug("row", row_ind);
logDebug("filling_row", filling_row);
logDebug("next_row", next_row);
ColumnRawPtrs last_sort_prefix_columns;
last_sort_prefix_columns.reserve(sort_prefix.size());
for (size_t pos : sort_prefix_positions)
last_sort_prefix_columns.push_back(last_row[pos].get());
bool should_insert_first = next_row < filling_row;
logDebug("should_insert_first", should_insert_first);
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
new_sort_prefix = false;
for (size_t i = 0; i < input_sort_prefix_columns.size(); ++i)
{
auto current_value = (*input_fill_columns[i])[row_ind];
const auto & fill_to = filling_row.getFillDescription(i).fill_to;
if (fill_to.isNull() || less(current_value, fill_to, filling_row.getDirection(i)))
next_row[i] = current_value;
else
next_row[i] = fill_to;
const int res = input_sort_prefix_columns[i]->compareAt(0, 0, *last_sort_prefix_columns[i], sort_prefix[i].nulls_direction);
if (res != 0)
{
new_sort_prefix = true;
break;
}
}
logDebug("next_row updated", next_row);
}
/// A case, when at previous step row was initialized from defaults 'fill_from' values
/// and probably we need to insert it to block.
if (should_insert_first && filling_row < next_row)
{
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}
for (size_t row_ind = 0; row_ind < num_rows;)
{
/// find next range
auto current_sort_prefix_end_pos = getRangeEnd(
row_ind,
num_rows,
[&](size_t pos_with_current_sort_prefix, size_t row_pos)
{
for (size_t i = 0; i < input_sort_prefix_columns.size(); ++i)
{
const int res = input_sort_prefix_columns[i]->compareAt(
pos_with_current_sort_prefix, row_pos, *input_sort_prefix_columns[i], sort_prefix[i].nulls_direction);
if (res != 0)
return false;
}
return true;
});
while (filling_row.next(next_row))
{
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}
/// generate suffix for the previous range
if (!last_range_sort_prefix.empty() && new_sort_prefix)
generateSuffixIfNeeded(result_columns, res_fill_columns, res_interpolate_columns, res_sort_prefix_columns, res_other_columns);
copyRowFromColumns(res_fill_columns, input_fill_columns, row_ind);
copyRowFromColumns(res_interpolate_columns, input_interpolate_columns, row_ind);
copyRowFromColumns(res_other_columns, input_other_columns, row_ind);
transformRange(
input_fill_columns,
input_interpolate_columns,
input_sort_prefix_columns,
input_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
res_sort_prefix_columns,
res_other_columns,
{row_ind, current_sort_prefix_end_pos},
new_sort_prefix);
logDebug("range end", current_sort_prefix_end_pos);
row_ind = current_sort_prefix_end_pos;
new_sort_prefix = true;
}
saveLastRow(result_columns);

View File

@ -16,7 +16,12 @@ namespace DB
class FillingTransform : public ISimpleTransform
{
public:
FillingTransform(const Block & header_, const SortDescription & sort_description_, InterpolateDescriptionPtr interpolate_description_);
FillingTransform(
const Block & header_,
const SortDescription & sort_description_,
const SortDescription & fill_description_,
InterpolateDescriptionPtr interpolate_description_,
bool use_with_fill_by_sorting_prefix_);
String getName() const override { return "FillingTransform"; }
@ -25,42 +30,72 @@ public:
static Block transformHeader(Block header, const SortDescription & sort_description);
protected:
void transform(Chunk & Chunk) override;
void transform(Chunk & chunk) override;
private:
using MutableColumnRawPtrs = std::vector<IColumn *>;
void transformRange(
const Columns & input_fill_columns,
const Columns & input_interpolate_columns,
const Columns & input_sort_prefix_columns,
const Columns & input_other_columns,
const MutableColumns & result_columns,
const MutableColumnRawPtrs & res_fill_columns,
const MutableColumnRawPtrs & res_interpolate_columns,
const MutableColumnRawPtrs & res_sort_prefix_columns,
const MutableColumnRawPtrs & res_other_columns,
std::pair<size_t, size_t> range,
bool new_sorting_prefix);
void saveLastRow(const MutableColumns & result_columns);
void interpolate(const MutableColumns & result_columns, Block & interpolate_block);
using MutableColumnRawPtrs = std::vector<IColumn *>;
void initColumns(
const Columns & input_columns,
Columns & input_fill_columns,
Columns & input_interpolate_columns,
Columns & input_sort_prefix_columns,
Columns & input_other_columns,
MutableColumns & output_columns,
MutableColumnRawPtrs & output_fill_columns,
MutableColumnRawPtrs & output_interpolate_columns,
MutableColumnRawPtrs & output_sort_prefix_columns,
MutableColumnRawPtrs & output_other_columns);
bool generateSuffixIfNeeded(
const Columns & input_columns,
MutableColumns & result_columns);
const MutableColumns & result_columns,
MutableColumnRawPtrs res_fill_columns,
MutableColumnRawPtrs res_interpolate_columns,
MutableColumnRawPtrs res_sort_prefix_columns,
MutableColumnRawPtrs res_other_columns);
bool generateSuffixIfNeeded(const Columns & input_columns, MutableColumns & result_columns);
const SortDescription sort_description; /// Contains only columns with WITH FILL.
void insertFromFillingRow(
const MutableColumnRawPtrs & filling_columns,
const MutableColumnRawPtrs & interpolate_columns,
const MutableColumnRawPtrs & other_columns,
const Block & interpolate_block);
const SortDescription sort_description;
const SortDescription fill_description; /// Contains only columns with WITH FILL.
SortDescription sort_prefix;
const InterpolateDescriptionPtr interpolate_description; /// Contains INTERPOLATE columns
FillingRow filling_row; /// Current row, which is used to fill gaps.
FillingRow next_row; /// Row to which we need to generate filling rows.
bool filling_row_inserted = false;
using Positions = std::vector<size_t>;
Positions fill_column_positions;
Positions interpolate_column_positions;
Positions other_column_positions;
Positions sort_prefix_positions;
std::vector<std::pair<size_t, NameAndTypePair>> input_positions; /// positions in result columns required for actions
ExpressionActionsPtr interpolate_actions;
Columns last_row;
bool first = true; /// flag to determine if transform is/will be called for the first time
Columns last_range_sort_prefix;
bool all_chunks_processed = false; /// flag to determine if we have already processed all chunks
const bool use_with_fill_by_sorting_prefix;
};
class FillingNoopTransform : public ISimpleTransform

View File

@ -369,6 +369,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & zookeeper_name,
const String & replica_path,
const String & host,
int port,
@ -401,13 +402,18 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
/// Validation of the input that may come from malicious replica.
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
String endpoint_id = getEndpointId(
data_settings->enable_the_endpoint_id_with_zookeeper_name_prefix ?
zookeeper_name + ":" + replica_path :
replica_path);
Poco::URI uri;
uri.setScheme(interserver_scheme);
uri.setHost(host);
uri.setPort(port);
uri.setQueryParameters(
{
{"endpoint", getEndpointId(replica_path)},
{"endpoint", endpoint_id},
{"part", part_name},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION)},
{"compress", "false"}
@ -630,7 +636,15 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
temporary_directory_lock = {};
/// Try again but without zero-copy
return fetchSelectedPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
return fetchSelectedPart(
metadata_snapshot,
context,
part_name,
zookeeper_name,
replica_path,
host,
port,
timeouts,
user, password, interserver_scheme, throttler, to_detached, tmp_prefix, nullptr, false, disk);
}
}

View File

@ -70,6 +70,7 @@ public:
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & zookeeper_name,
const String & replica_path,
const String & host,
int port,

View File

@ -7198,9 +7198,17 @@ QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
if (query_context->getClientInfo().collaborate_with_initiator)
return QueryProcessingStage::Enum::FetchColumns;
if (query_context->canUseParallelReplicasOnInitiator()
&& to_stage >= QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::Enum::WithMergeableState;
/// Parallel replicas
if (query_context->canUseParallelReplicasOnInitiator() && to_stage >= QueryProcessingStage::WithMergeableState)
{
/// ReplicatedMergeTree
if (supportsReplication())
return QueryProcessingStage::Enum::WithMergeableState;
/// For non-replicated MergeTree we allow them only if parallel_replicas_for_non_replicated_merge_tree is enabled
if (query_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree)
return QueryProcessingStage::Enum::WithMergeableState;
}
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
{

View File

@ -5,6 +5,7 @@
#include <boost/algorithm/string/trim.hpp>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Disks/WriteMode.h>
#include <Disks/IDisk.h>

View File

@ -159,6 +159,7 @@ struct Settings;
M(UInt64, min_bytes_to_rebalance_partition_over_jbod, 0, "Minimal amount of bytes to enable part rebalance over JBOD array (0 - disabled).", 0) \
M(Bool, check_sample_column_is_correct, true, "Check columns or columns by hash for sampling are unsigned integer.", 0) \
M(Bool, allow_vertical_merges_from_compact_to_wide_parts, false, "Allows vertical merges from compact to wide parts. This settings must have the same value on all replicas", 0) \
M(Bool, enable_the_endpoint_id_with_zookeeper_name_prefix, false, "Enable the endpoint id with zookeeper name prefix for the replicated merge tree table", 0) \
\
/** Experimental/work in progress feature. Unsafe for production. */ \
M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \

View File

@ -251,6 +251,7 @@ struct SelectQueryInfo
bool is_projection_query = false;
bool merge_tree_empty_result = false;
bool settings_limit_offset_done = false;
bool is_internal = false;
Block minmax_count_projection_block;
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr;

View File

@ -145,7 +145,6 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int INFINITE_LOOP;
extern const int ILLEGAL_FINAL;
extern const int TYPE_MISMATCH;
extern const int TOO_MANY_ROWS;
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS;
@ -1045,10 +1044,6 @@ void StorageDistributed::read(
const size_t /*max_block_size*/,
const size_t /*num_streams*/)
{
const auto * select_query = query_info.query->as<ASTSelectQuery>();
if (select_query->final() && local_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas)
throw Exception(ErrorCodes::ILLEGAL_FINAL, "Final modifier is not allowed together with parallel reading from replicas feature");
Block header;
ASTPtr query_ast;

View File

@ -633,10 +633,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
auto & modified_select = modified_query_info.query->as<ASTSelectQuery &>();
QueryPipelineBuilderPtr builder;
bool final = isFinal(modified_query_info);
if (!final && storage->needRewriteQueryWithFinal(real_column_names))
if (!InterpreterSelectQuery::isQueryWithFinal(modified_query_info) && storage->needRewriteQueryWithFinal(real_column_names))
{
/// NOTE: It may not work correctly in some cases, because query was analyzed without final.
/// However, it's needed for MaterializedMySQL and it's unlikely that someone will use it with Merge tables.
@ -1010,21 +1007,13 @@ bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
{
/// Disable read-in-order optimization for reverse order with final.
/// Otherwise, it can lead to incorrect final behavior because the implementation may rely on the reading in direct order).
if (order_info_->direction != 1 && isFinal(query_info))
if (order_info_->direction != 1 && InterpreterSelectQuery::isQueryWithFinal(query_info))
return false;
order_info = order_info_;
return true;
}
bool ReadFromMerge::isFinal(const SelectQueryInfo & query_info)
{
if (query_info.table_expression_modifiers)
return query_info.table_expression_modifiers->hasFinal();
const auto & select_query = query_info.query->as<ASTSelectQuery &>();
return select_query.final();
}
IStorage::ColumnSizeByName StorageMerge::getColumnSizes() const
{
ColumnSizeByName column_sizes;

View File

@ -145,7 +145,6 @@ public:
/// Returns `false` if requested reading cannot be performed.
bool requestReadingInOrder(InputOrderInfoPtr order_info_);
static bool isFinal(const SelectQueryInfo & query_info);
private:
const size_t required_max_block_size;

View File

@ -240,6 +240,15 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperAndAssertNotReadonl
return res;
}
String StorageReplicatedMergeTree::getEndpointName() const
{
const MergeTreeSettings & settings = getContext()->getReplicatedMergeTreeSettings();
if (settings.enable_the_endpoint_id_with_zookeeper_name_prefix)
return zookeeper_name + ":" + replica_path;
return replica_path;
}
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context)
{
return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
@ -1841,6 +1850,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
String source_replica_path = fs::path(zookeeper_path) / "replicas" / replica;
if (!fetchPart(part_name,
metadata_snapshot,
zookeeper_name,
source_replica_path,
/* to_detached= */ false,
entry.quorum,
@ -2341,7 +2351,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
interserver_scheme, address.scheme, address.host);
part_desc->res_part = fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path,
metadata_snapshot, getContext(), part_desc->found_new_part_name, zookeeper_name, source_replica_path,
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(),
interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_");
@ -2458,7 +2468,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
interserver_scheme, address.scheme, address.host);
return fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), entry.new_part_name, source_replica_path,
metadata_snapshot, getContext(), entry.new_part_name, zookeeper_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme,
replicated_fetches_throttler, true);
@ -4042,6 +4052,7 @@ bool StorageReplicatedMergeTree::partIsLastQuorumPart(const MergeTreePartInfo &
bool StorageReplicatedMergeTree::fetchPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & source_zookeeper_name,
const String & source_replica_path,
bool to_detached,
size_t quorum,
@ -4077,7 +4088,7 @@ bool StorageReplicatedMergeTree::fetchPart(
currently_fetching_parts.erase(part_name);
});
LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path);
LOG_DEBUG(log, "Fetching part {} from {}:{}", part_name, source_zookeeper_name, source_replica_path);
auto settings_ptr = getSettings();
TableLockHolder table_lock_holder;
@ -4134,7 +4145,8 @@ bool StorageReplicatedMergeTree::fetchPart(
}
else
{
LOG_INFO(log, "Not checking checksums of part {} with replica {} because part was removed from ZooKeeper", part_name, source_replica_path);
LOG_INFO(log, "Not checking checksums of part {} with replica {}:{} because part was removed from ZooKeeper",
part_name, source_zookeeper_name, source_replica_path);
}
}
@ -4187,6 +4199,7 @@ bool StorageReplicatedMergeTree::fetchPart(
metadata_snapshot,
getContext(),
part_name,
source_zookeeper_name,
source_replica_path,
address.host,
address.replication_port,
@ -4279,7 +4292,7 @@ bool StorageReplicatedMergeTree::fetchPart(
if (part_to_clone)
LOG_DEBUG(log, "Cloned part {} from {}{}", part_name, part_to_clone->name, to_detached ? " (to 'detached' directory)" : "");
else
LOG_DEBUG(log, "Fetched part {} from {}{}", part_name, source_replica_path, to_detached ? " (to 'detached' directory)" : "");
LOG_DEBUG(log, "Fetched part {} from {}:{}{}", part_name, source_zookeeper_name, source_replica_path, to_detached ? " (to 'detached' directory)" : "");
return true;
}
@ -4318,7 +4331,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
currently_fetching_parts.erase(part_name);
});
LOG_DEBUG(log, "Fetching already known part {} from {}", part_name, source_replica_path);
LOG_DEBUG(log, "Fetching already known part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
TableLockHolder table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
@ -4350,7 +4363,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
"'{}' != '{}', can't fetch part from {}", interserver_scheme, address.scheme, address.host);
return fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), part_name, source_replica_path,
metadata_snapshot, getContext(), part_name, zookeeper_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(),
interserver_scheme, replicated_fetches_throttler, false, "", nullptr, true,
@ -4387,7 +4400,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part {} from {}", part_name, source_replica_path);
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
return part->getDataPartStoragePtr();
}
@ -4430,7 +4443,16 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared<DataPartsExchange::Service>(*this);
[[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr);
assert(prev_ptr == nullptr);
getContext()->getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr);
/// The endpoint id:
/// old format: DataPartsExchange:/clickhouse/tables/default/t1/{shard}/{replica}
/// new format: DataPartsExchange:{zookeeper_name}:/clickhouse/tables/default/t1/{shard}/{replica}
/// Notice:
/// They are incompatible and the default is the old format.
/// If you want to use the new format, please ensure that 'enable_the_endpoint_id_with_zookeeper_name_prefix' of all nodes is true .
///
getContext()->getInterserverIOHandler().addEndpoint(
data_parts_exchange_ptr->getId(getEndpointName()), data_parts_exchange_ptr);
startBeingLeader();
@ -4555,7 +4577,7 @@ void StorageReplicatedMergeTree::shutdown()
auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{});
if (data_parts_exchange_ptr)
{
getContext()->getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_ptr->getId(replica_path));
getContext()->getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_ptr->getId(getEndpointName()));
/// Ask all parts exchange handlers to finish asap. New ones will fail to start
data_parts_exchange_ptr->blocker.cancelForever();
/// Wait for all of them
@ -6237,14 +6259,14 @@ void StorageReplicatedMergeTree::fetchPartition(
info.table_id = getStorageID();
info.table_id.uuid = UUIDHelpers::Nil;
auto expand_from = query_context->getMacros()->expand(from_, info);
String auxiliary_zookeeper_name = zkutil::extractZooKeeperName(expand_from);
String from_zookeeper_name = zkutil::extractZooKeeperName(expand_from);
String from = zkutil::extractZooKeeperPath(expand_from, /* check_starts_with_slash */ true);
if (from.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "ZooKeeper path should not be empty");
zkutil::ZooKeeperPtr zookeeper;
if (auxiliary_zookeeper_name != default_zookeeper_name)
zookeeper = getContext()->getAuxiliaryZooKeeper(auxiliary_zookeeper_name);
if (from_zookeeper_name != default_zookeeper_name)
zookeeper = getContext()->getAuxiliaryZooKeeper(from_zookeeper_name);
else
zookeeper = getZooKeeper();
@ -6263,12 +6285,12 @@ void StorageReplicatedMergeTree::fetchPartition(
*/
if (checkIfDetachedPartExists(part_name))
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Detached part {} already exists.", part_name);
LOG_INFO(log, "Will fetch part {} from shard {} (zookeeper '{}')", part_name, from_, auxiliary_zookeeper_name);
LOG_INFO(log, "Will fetch part {} from shard {}", part_name, from_);
try
{
/// part name , metadata, part_path , true, 0, zookeeper
if (!fetchPart(part_name, metadata_snapshot, part_path, true, 0, zookeeper, /* try_fetch_shared = */ false))
if (!fetchPart(part_name, metadata_snapshot, from_zookeeper_name, part_path, true, 0, zookeeper, /* try_fetch_shared = */ false))
throw Exception(ErrorCodes::UNFINISHED, "Failed to fetch part {} from {}", part_name, from_);
}
catch (const DB::Exception & e)
@ -6283,7 +6305,7 @@ void StorageReplicatedMergeTree::fetchPartition(
}
String partition_id = getPartitionIDFromQuery(partition, query_context);
LOG_INFO(log, "Will fetch partition {} from shard {} (zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);
LOG_INFO(log, "Will fetch partition {} from shard {}", partition_id, from_);
/** Let's check that there is no such partition in the `detached` directory (where we will write the downloaded parts).
* Unreliable (there is a race condition) - such a partition may appear a little later.
@ -6307,7 +6329,7 @@ void StorageReplicatedMergeTree::fetchPartition(
active_replicas.push_back(replica);
if (active_replicas.empty())
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No active replicas for shard {}", from);
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No active replicas for shard {}", from_);
/** You must select the best (most relevant) replica.
* This is a replica with the maximum `log_pointer`, then with the minimum `queue` size.
@ -6361,7 +6383,8 @@ void StorageReplicatedMergeTree::fetchPartition(
LOG_INFO(log, "Some of parts ({}) are missing. Will try to fetch covering parts.", missing_parts.size());
if (try_no >= query_context->getSettings().max_fetch_partition_retries_count)
throw Exception(ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS, "Too many retries to fetch parts from {}", best_replica_path);
throw Exception(ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS,
"Too many retries to fetch parts from {}:{}", from_zookeeper_name, best_replica_path);
Strings parts = zookeeper->getChildren(fs::path(best_replica_path) / "parts");
ActiveDataPartSet active_parts_set(format_version, parts);
@ -6382,7 +6405,8 @@ void StorageReplicatedMergeTree::fetchPartition(
parts_to_fetch = std::move(parts_to_fetch_partition);
if (parts_to_fetch.empty())
throw Exception(ErrorCodes::PARTITION_DOESNT_EXIST, "Partition {} on {} doesn't exist", partition_id, best_replica_path);
throw Exception(ErrorCodes::PARTITION_DOESNT_EXIST,
"Partition {} on {}:{} doesn't exist", partition_id, from_zookeeper_name, best_replica_path);
}
else
{
@ -6392,7 +6416,7 @@ void StorageReplicatedMergeTree::fetchPartition(
if (!containing_part.empty())
parts_to_fetch.push_back(containing_part);
else
LOG_WARNING(log, "Part {} on replica {} has been vanished.", missing_part, best_replica_path);
LOG_WARNING(log, "Part {} on replica {}:{} has been vanished.", missing_part, from_zookeeper_name, best_replica_path);
}
}
@ -6405,7 +6429,7 @@ void StorageReplicatedMergeTree::fetchPartition(
try
{
fetched = fetchPart(part, metadata_snapshot, best_replica_path, true, 0, zookeeper, /* try_fetch_shared = */ false);
fetched = fetchPart(part, metadata_snapshot, from_zookeeper_name, best_replica_path, true, 0, zookeeper, /* try_fetch_shared = */ false);
}
catch (const DB::Exception & e)
{

View File

@ -382,6 +382,7 @@ private:
zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const;
zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const;
void setZooKeeper();
String getEndpointName() const;
/// If true, the table is offline and can not be written to it.
/// This flag is managed by RestartingThread.
@ -699,6 +700,7 @@ private:
bool fetchPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & source_zookeeper_name,
const String & source_replica_path,
bool to_detached,
size_t quorum,

View File

@ -31,7 +31,7 @@
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/ReadFromStorageProgress.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/StoredObject.h>
@ -676,8 +676,8 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size;
auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
pool_reader, modified_settings, std::move(s3_impl),
auto async_reader = std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(s3_impl), pool_reader, modified_settings,
context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog());
async_reader->setReadUntilEnd();

View File

@ -10,6 +10,7 @@
#include <Common/formatReadable.h>
#include <Common/StringUtils/StringUtils.h>
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Common/logger_useful.h>
#include <Interpreters/Set.h>
#include <Processors/Sinks/SinkToStorage.h>

Some files were not shown because too many files have changed in this diff Show More