Merge master

This commit is contained in:
kssenii 2022-05-10 19:33:34 +02:00
commit ccab49e3cf
745 changed files with 2463 additions and 156311 deletions

View File

@ -1,7 +1,7 @@
### Changelog category (leave one):
- New Feature
- Improvement
- Bug Fix (user-visible misbehaviour in official stable or prestable release)
- Bug Fix (user-visible misbehavior in official stable or prestable release)
- Performance Improvement
- Backward Incompatible Change
- Build/Testing/Packaging Improvement

View File

@ -1,48 +0,0 @@
name: "CodeQL"
"on":
schedule:
- cron: '0 0 * * *'
workflow_dispatch:
env:
CC: clang-14
CXX: clang++-14
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: ['cpp']
steps:
- name: Checkout repository
uses: actions/checkout@v3
with:
submodules: 'true'
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
- name: Build
run: |
sudo apt-get install -yq ninja-build
sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)"
mkdir build
cd build
cmake -DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1 -DCLICKHOUSE_SPLIT_BINARY=1 ..
ninja
rm -rf ../contrib
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2

View File

@ -0,0 +1,43 @@
#!/usr/bin/env bash
QUERIES_FILE="queries.sql"
TABLE=$1
TRIES=3
PARAMS="--host ... --secure --password ..."
if [ -x ./clickhouse ]
then
CLICKHOUSE_CLIENT="./clickhouse client"
elif command -v clickhouse-client >/dev/null 2>&1
then
CLICKHOUSE_CLIENT="clickhouse-client"
else
echo "clickhouse-client is not found"
exit 1
fi
QUERY_ID_PREFIX="benchmark_$RANDOM"
QUERY_NUM=1
cat "$QUERIES_FILE" | sed "s/{table}/${TABLE}/g" | while read query
do
for i in $(seq 1 $TRIES)
do
QUERY_ID="${QUERY_ID_PREFIX}_${QUERY_NUM}_${i}"
${CLICKHOUSE_CLIENT} ${PARAMS} --query_id "${QUERY_ID}" --format=Null --max_memory_usage=100G --query="$query"
echo -n '.'
done
QUERY_NUM=$((QUERY_NUM + 1))
echo
done
sleep 10
${CLICKHOUSE_CLIENT} ${PARAMS} --query "
WITH extractGroups(query_id, '(\d+)_(\d+)\$') AS num_run, num_run[1]::UInt8 AS num, num_run[2]::UInt8 AS run
SELECT groupArrayInsertAt(query_duration_ms / 1000, (run - 1)::UInt8)::String || ','
FROM clusterAllReplicas(default, system.query_log)
WHERE event_date >= yesterday() AND type = 2 AND query_id LIKE '${QUERY_ID_PREFIX}%'
GROUP BY num ORDER BY num FORMAT TSV
"

View File

@ -364,10 +364,8 @@ SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_snappy.cc" ${ARROW_SRCS})
add_definitions(-DARROW_WITH_ZLIB)
SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zlib.cc" ${ARROW_SRCS})
if (ARROW_WITH_ZSTD)
add_definitions(-DARROW_WITH_ZSTD)
SET(ARROW_SRCS "${LIBRARY_DIR}/util/compression_zstd.cc" ${ARROW_SRCS})
endif ()
add_library(_arrow ${ARROW_SRCS})
@ -383,7 +381,6 @@ target_link_libraries(_arrow PRIVATE
ch_contrib::snappy
ch_contrib::zlib
ch_contrib::zstd
ch_contrib::zstd
)
target_link_libraries(_arrow PUBLIC _orc)

View File

@ -101,7 +101,12 @@ EOL
function stop()
{
clickhouse stop
clickhouse stop --do-not-kill && return
# We failed to stop the server with SIGTERM. Maybe it hang, let's collect stacktraces.
kill -TERM "$(pidof gdb)" ||:
sleep 5
gdb -batch -ex 'thread apply all backtrace' -p "$(cat /var/run/clickhouse-server/clickhouse-server.pid)" ||:
clickhouse stop --force
}
function start()
@ -201,7 +206,7 @@ mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/c
start
clickhouse-client --query "SELECT 'Server successfully started', 'OK'" >> /test_output/test_results.tsv \
|| (echo -e 'Server failed to start (see application_errors.txt)\tFAIL' >> /test_output/test_results.tsv \
|| (echo -e 'Server failed to start (see application_errors.txt and clickhouse-server.clean.log)\tFAIL' >> /test_output/test_results.tsv \
&& grep -Fa "<Error>.*Application" /var/log/clickhouse-server/clickhouse-server.log > /test_output/application_errors.txt)
[ -f /var/log/clickhouse-server/clickhouse-server.log ] || echo -e "Server log does not exist\tFAIL"
@ -387,7 +392,7 @@ for table in query_log trace_log; do
done
# Write check result into check_status.tsv
clickhouse-local --structure "test String, res String" -q "SELECT 'failure', test FROM table WHERE res != 'OK' order by (lower(test) like '%hung%') LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv
clickhouse-local --structure "test String, res String" -q "SELECT 'failure', test FROM table WHERE res != 'OK' order by (lower(test) like '%hung%'), rowNumberInAllBlocks() LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv
[ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv
# Core dumps (see gcore)

View File

@ -12,7 +12,7 @@ UNKNOWN_SIGN = "[ UNKNOWN "
SKIPPED_SIGN = "[ SKIPPED "
HUNG_SIGN = "Found hung queries in processlist"
NO_TASK_TIMEOUT_SIGNS = ["All tests have finished", "No tests were run"]
SUCCESS_FINISH_SIGNS = ["All tests have finished", "No tests were run"]
RETRIES_SIGN = "Some tests were restarted"
@ -25,14 +25,14 @@ def process_test_log(log_path):
success = 0
hung = False
retries = False
task_timeout = True
success_finish = False
test_results = []
with open(log_path, "r") as test_file:
for line in test_file:
original_line = line
line = line.strip()
if any(s in line for s in NO_TASK_TIMEOUT_SIGNS):
task_timeout = False
if any(s in line for s in SUCCESS_FINISH_SIGNS):
success_finish = True
if HUNG_SIGN in line:
hung = True
if RETRIES_SIGN in line:
@ -81,7 +81,7 @@ def process_test_log(log_path):
failed,
success,
hung,
task_timeout,
success_finish,
retries,
test_results,
)
@ -108,7 +108,7 @@ def process_result(result_path):
failed,
success,
hung,
task_timeout,
success_finish,
retries,
test_results,
) = process_test_log(result_path)
@ -123,10 +123,10 @@ def process_result(result_path):
description = "Some queries hung, "
state = "failure"
test_results.append(("Some queries hung", "FAIL", "0", ""))
elif task_timeout:
description = "Timeout, "
elif not success_finish:
description = "Tests are not finished, "
state = "failure"
test_results.append(("Timeout", "FAIL", "0", ""))
test_results.append(("Tests are not finished", "FAIL", "0", ""))
elif retries:
description = "Some tests restarted, "
test_results.append(("Some tests restarted", "SKIPPED", "0", ""))

View File

@ -694,6 +694,49 @@ auto s = std::string{"Hello"};
**2.** Exception specifiers from C++03 are not used.
**3.** Constructs which have convenient syntactic sugar in modern C++, e.g.
```
// Traditional way without syntactic sugar
template <typename G, typename = std::enable_if_t<std::is_same<G, F>::value, void>> // SFINAE via std::enable_if, usage of ::value
std::pair<int, int> func(const E<G> & e) // explicitly specified return type
{
if (elements.count(e)) // .count() membership test
{
// ...
}
elements.erase(
std::remove_if(
elements.begin(), elements.end(),
[&](const auto x){
return x == 1;
}),
elements.end()); // remove-erase idiom
return std::make_pair(1, 2); // create pair via make_pair()
}
// With syntactic sugar (C++14/17/20)
template <typename G>
requires std::same_v<G, F> // SFINAE via C++20 concept, usage of C++14 template alias
auto func(const E<G> & e) // auto return type (C++14)
{
if (elements.contains(e)) // C++20 .contains membership test
{
// ...
}
elements.erase_if(
elements,
[&](const auto x){
return x == 1;
}); // C++20 std::erase_if
return {1, 2}; // or: return std::pair(1, 2); // create pair via initialization list or value initialization (C++17)
}
```
## Platform {#platform}
**1.** We write code for a specific platform.

View File

@ -45,7 +45,7 @@ clickhouse-client --query "CREATE DATABASE IF NOT EXISTS datasets"
# for hits_v1
clickhouse-client --query "CREATE TABLE datasets.hits_v1 ( WatchID UInt64, JavaEnable UInt8, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID UInt32, ClientIP UInt32, ClientIP6 FixedString(16), RegionID UInt32, UserID UInt64, CounterClass Int8, OS UInt8, UserAgent UInt8, URL String, Referer String, URLDomain String, RefererDomain String, Refresh UInt8, IsRobot UInt8, RefererCategories Array(UInt16), URLCategories Array(UInt16), URLRegions Array(UInt32), RefererRegions Array(UInt32), ResolutionWidth UInt16, ResolutionHeight UInt16, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, FlashMinor2 String, NetMajor UInt8, NetMinor UInt8, UserAgentMajor UInt16, UserAgentMinor FixedString(2), CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, MobilePhone UInt8, MobilePhoneModel String, Params String, IPNetworkID UInt32, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, IsArtifical UInt8, WindowClientWidth UInt16, WindowClientHeight UInt16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 UInt8, SilverlightVersion2 UInt8, SilverlightVersion3 UInt32, SilverlightVersion4 UInt16, PageCharset String, CodeVersion UInt32, IsLink UInt8, IsDownload UInt8, IsNotBounce UInt8, FUniqID UInt64, HID UInt32, IsOldCounter UInt8, IsEvent UInt8, IsParameter UInt8, DontCountHits UInt8, WithHash UInt8, HitColor FixedString(1), UTCEventTime DateTime, Age UInt8, Sex UInt8, Income UInt8, Interests UInt16, Robotness UInt8, GeneralInterests Array(UInt16), RemoteIP UInt32, RemoteIP6 FixedString(16), WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage FixedString(2), BrowserCountry FixedString(2), SocialNetwork String, SocialAction String, HTTPError UInt16, SendTiming Int32, DNSTiming Int32, ConnectTiming Int32, ResponseStartTiming Int32, ResponseEndTiming Int32, FetchTiming Int32, RedirectTiming Int32, DOMInteractiveTiming Int32, DOMContentLoadedTiming Int32, DOMCompleteTiming Int32, LoadEventStartTiming Int32, LoadEventEndTiming Int32, NSToDOMContentLoadedTiming Int32, FirstPaintTiming Int32, RedirectCount Int8, SocialSourceNetworkID UInt8, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency FixedString(3), ParamCurrencyID UInt16, GoalsReached Array(UInt32), OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, RefererHash UInt64, URLHash UInt64, CLID UInt32, YCLID UInt64, ShareService String, ShareURL String, ShareTitle String, ParsedParams Nested(Key1 String, Key2 String, Key3 String, Key4 String, Key5 String, ValueDouble Float64), IslandID FixedString(16), RequestNum UInt32, RequestTry UInt8) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192"
# for hits_100m_obfuscated
clickhouse-client --query="CREATE TABLE hits_100m_obfuscated (WatchID UInt64, JavaEnable UInt8, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID UInt32, ClientIP UInt32, RegionID UInt32, UserID UInt64, CounterClass Int8, OS UInt8, UserAgent UInt8, URL String, Referer String, Refresh UInt8, RefererCategoryID UInt16, RefererRegionID UInt32, URLCategoryID UInt16, URLRegionID UInt32, ResolutionWidth UInt16, ResolutionHeight UInt16, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, FlashMinor2 String, NetMajor UInt8, NetMinor UInt8, UserAgentMajor UInt16, UserAgentMinor FixedString(2), CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, MobilePhone UInt8, MobilePhoneModel String, Params String, IPNetworkID UInt32, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, IsArtifical UInt8, WindowClientWidth UInt16, WindowClientHeight UInt16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 UInt8, SilverlightVersion2 UInt8, SilverlightVersion3 UInt32, SilverlightVersion4 UInt16, PageCharset String, CodeVersion UInt32, IsLink UInt8, IsDownload UInt8, IsNotBounce UInt8, FUniqID UInt64, OriginalURL String, HID UInt32, IsOldCounter UInt8, IsEvent UInt8, IsParameter UInt8, DontCountHits UInt8, WithHash UInt8, HitColor FixedString(1), LocalEventTime DateTime, Age UInt8, Sex UInt8, Income UInt8, Interests UInt16, Robotness UInt8, RemoteIP UInt32, WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage FixedString(2), BrowserCountry FixedString(2), SocialNetwork String, SocialAction String, HTTPError UInt16, SendTiming UInt32, DNSTiming UInt32, ConnectTiming UInt32, ResponseStartTiming UInt32, ResponseEndTiming UInt32, FetchTiming UInt32, SocialSourceNetworkID UInt8, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency FixedString(3), ParamCurrencyID UInt16, OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, RefererHash UInt64, URLHash UInt64, CLID UInt32) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192"
clickhouse-client --query="CREATE TABLE default.hits_100m_obfuscated (WatchID UInt64, JavaEnable UInt8, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID UInt32, ClientIP UInt32, RegionID UInt32, UserID UInt64, CounterClass Int8, OS UInt8, UserAgent UInt8, URL String, Referer String, Refresh UInt8, RefererCategoryID UInt16, RefererRegionID UInt32, URLCategoryID UInt16, URLRegionID UInt32, ResolutionWidth UInt16, ResolutionHeight UInt16, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, FlashMinor2 String, NetMajor UInt8, NetMinor UInt8, UserAgentMajor UInt16, UserAgentMinor FixedString(2), CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, MobilePhone UInt8, MobilePhoneModel String, Params String, IPNetworkID UInt32, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, IsArtifical UInt8, WindowClientWidth UInt16, WindowClientHeight UInt16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 UInt8, SilverlightVersion2 UInt8, SilverlightVersion3 UInt32, SilverlightVersion4 UInt16, PageCharset String, CodeVersion UInt32, IsLink UInt8, IsDownload UInt8, IsNotBounce UInt8, FUniqID UInt64, OriginalURL String, HID UInt32, IsOldCounter UInt8, IsEvent UInt8, IsParameter UInt8, DontCountHits UInt8, WithHash UInt8, HitColor FixedString(1), LocalEventTime DateTime, Age UInt8, Sex UInt8, Income UInt8, Interests UInt16, Robotness UInt8, RemoteIP UInt32, WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage FixedString(2), BrowserCountry FixedString(2), SocialNetwork String, SocialAction String, HTTPError UInt16, SendTiming UInt32, DNSTiming UInt32, ConnectTiming UInt32, ResponseStartTiming UInt32, ResponseEndTiming UInt32, FetchTiming UInt32, SocialSourceNetworkID UInt8, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency FixedString(3), ParamCurrencyID UInt16, OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, RefererHash UInt64, URLHash UInt64, CLID UInt32) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192"
# import data
cat hits_v1.tsv | clickhouse-client --query "INSERT INTO datasets.hits_v1 FORMAT TSV" --max_insert_block_size=100000

View File

@ -426,7 +426,7 @@ Now `rule` can configure `method`, `headers`, `url`, `handler`:
- `status` — use with `static` type, response status code.
- `content_type` — use with `static` type, response [content-type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type).
- `content_type` — use with any type, response [content-type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type).
- `response_content` — use with `static` type, response content sent to client, when using the prefix file:// or config://, find the content from the file or configuration sends to client.

View File

@ -0,0 +1,57 @@
---
sidebar_position: 20
sidebar_label: PostgreSQL Interface
---
# PostgreSQL Interface
ClickHouse supports the PostgreSQL wire protocol, which allows you to use Postgres clients to connect to ClickHouse. In a sense, ClickHouse can pretend to a PostgreSQL instance - allowing you to connect a PostgreSQL client application to ClickHouse that is not already directy supported by ClickHouse (for example, Amazon Redshift).
To enable the PostgreSQL wire protocol, add the [postgresql_port](../operations/server-configuration-parameters/settings#server_configuration_parameters-postgresql_port) setting to your server's configuration file. For example, you could define the port in a new XML file in your `config.d` folder:
```xml
<clickhouse>
<postgresql_port>9005</postgresql_port>
</clickhouse>
```
Startup your ClickHouse server and look for a log message similar to the following that mentions **Listening for PostgreSQL compatibility protocol**:
```response
{} <Information> Application: Listening for PostgreSQL compatibility protocol: 127.0.0.1:9005
```
## Connect psql to ClickHouse
The following command demonstrates how to connect the PostgreSQL client `psql` to ClickHouse:
```bash
psql -p [port] -h [hostname] -U [username] [database_name]
```
For example:
```bash
psql -p 9005 -h 127.0.0.1 -U alice default
```
:::note
The `psql` client requires a login with a password, so you will not be able connect using the `default` user with no password. Either assign a password to the `default` user, or login as a different user.
:::
The `psql` client prompts for the password:
```response
Password for user alice:
psql (14.2, server 22.3.1.1)
WARNING: psql major version 14, server major version 22.
Some psql features might not work.
Type "help" for help.
default=>
```
And that's it! You now have a PostgreSQL client connected to ClickHouse, and all commands and queries are executed on ClickHouse.
[Original article](https://clickhouse.com/docs/en/interfaces/postgresql)

View File

@ -622,7 +622,7 @@ arraySlice(array, offset[, length])
- `array` Array of data.
- `offset` Indent from the edge of the array. A positive value indicates an offset on the left, and a negative value is an indent on the right. Numbering of the array items begins with 1.
- `length` The length of the required slice. If you specify a negative value, the function returns an open slice `[offset, array_length - length)`. If you omit the value, the function returns the slice `[offset, the_end_of_array]`.
- `length` The length of the required slice. If you specify a negative value, the function returns an open slice `[offset, array_length - length]`. If you omit the value, the function returns the slice `[offset, the_end_of_array]`.
**Example**

View File

@ -130,13 +130,9 @@ bitSlice(s, offset[, length])
**Arguments**
- `s` — s is [String](../../sql-reference/data-types/string.md)
or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `offset` — The start index with bit, A positive value indicates an offset on the left, and a negative value is an
indent on the right. Numbering of the bits begins with 1.
- `length` — The length of substring with bit. If you specify a negative value, the function returns an open substring [
offset, array_length - length). If you omit the value, the function returns the substring [offset, the_end_string].
If length exceeds s, it will be truncate.If length isn't multiple of 8, will fill 0 on the right.
- `s` — s is [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `offset` — The start index with bit, A positive value indicates an offset on the left, and a negative value is an indent on the right. Numbering of the bits begins with 1.
- `length` — The length of substring with bit. If you specify a negative value, the function returns an open substring \[offset, array_length - length\]. If you omit the value, the function returns the substring \[offset, the_end_string\]. If length exceeds s, it will be truncate.If length isn't multiple of 8, will fill 0 on the right.
**Returned value**

View File

@ -478,3 +478,17 @@ Result:
│ 0 │
└──────────────────────────────────────────────┘
```
Query:
``` sql
SELECT isIPAddressInRange('::ffff:192.168.0.1', '::ffff:192.168.0.4/128');
```
Result:
``` text
┌─isIPAddressInRange('::ffff:192.168.0.1', '::ffff:192.168.0.4/128')─┐
│ 0 │
└────────────────────────────────────────────────────────────────────┘
```

View File

@ -480,7 +480,7 @@ Result:
## substring(s, offset, length), mid(s, offset, length), substr(s, offset, length) {#substring}
Returns a substring starting with the byte from the offset index that is length bytes long. Character indexing starts from one (as in standard SQL). The offset and length arguments must be constants.
Returns a substring starting with the byte from the offset index that is length bytes long. Character indexing starts from one (as in standard SQL).
## substringUTF8(s, offset, length) {#substringutf8}

View File

@ -410,7 +410,7 @@ $ curl -v 'http://localhost:8123/predefined_query'
- `status` — используется с типом `static`, возвращает код состояния ответа.
- `content_type` — используется с типом `static`, возвращает [content-type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type).
- `content_type` — используется со всеми типами, возвращает [content-type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type).
- `response_content` — используется с типом`static`, содержимое ответа, отправленное клиенту, при использовании префикса file:// or config://, находит содержимое из файла или конфигурации, отправленного клиенту.

View File

@ -31,7 +31,5 @@ sidebar_label: UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
- `UInt16` — \[0 : 65535\]
- `UInt32` — \[0 : 4294967295\]
- `UInt64` — \[0 : 18446744073709551615\]
- `UInt128` — \[0 : 340282366920938463463374607431768211455\]
- `UInt256` — \[0 : 115792089237316195423570985008687907853269984665640564039457584007913129639935\]
`UInt128` пока не реализован.

View File

@ -21,7 +21,7 @@ LowCardinality(data_type)
`LowCardinality` — это надстройка, изменяющая способ хранения и правила обработки данных. ClickHouse применяет [словарное кодирование](https://en.wikipedia.org/wiki/Dictionary_coder) в столбцы типа `LowCardinality`. Работа с данными, представленными в словарном виде, может значительно увеличивать производительность запросов [SELECT](../statements/select/index.md) для многих приложений.
Эффективность использования типа данных `LowCarditality` зависит от разнообразия данных. Если словарь содержит менее 10 000 различных значений, ClickHouse в основном показывает более высокую эффективность чтения и хранения данных. Если же словарь содержит более 100 000 различных значений, ClickHouse может работать хуже, чем при использовании обычных типов данных.
Эффективность использования типа данных `LowCardinality` зависит от разнообразия данных. Если словарь содержит менее 10 000 различных значений, ClickHouse в основном показывает более высокую эффективность чтения и хранения данных. Если же словарь содержит более 100 000 различных значений, ClickHouse может работать хуже, чем при использовании обычных типов данных.
При работе со строками использование `LowCardinality` вместо [Enum](enum.md) обеспечивает большую гибкость в использовании и часто показывает такую же или более высокую эффективность.

View File

@ -575,8 +575,8 @@ arraySlice(array, offset[, length])
**Аргументы**
- `array` массив данных.
- `offset` отступ от края массива. Положительное значение - отступ слева, отрицательное значение - отступ справа. Отсчет элементов массива начинается с 1.
- `length` длина необходимого среза. Если указать отрицательное значение, то функция вернёт открытый срез `[offset, array_length - length)`. Если не указать значение, то функция вернёт срез `[offset, the_end_of_array]`.
- `offset` отступ от края массива. Положительное значение - отступ слева, отрицательное значение - отступ справа. Отсчёт элементов массива начинается с 1.
- `length` длина необходимого среза. Если указать отрицательное значение, то функция вернёт открытый срез `[offset, array_length - length]`. Если не указать значение, то функция вернёт срез `[offset, the_end_of_array]`.
**Пример**

View File

@ -446,3 +446,17 @@ SELECT isIPAddressInRange('127.0.0.1', 'ffff::/16');
│ 0 │
└──────────────────────────────────────────────┘
```
Запрос:
``` sql
SELECT isIPAddressInRange('::ffff:192.168.0.1', '::ffff:192.168.0.4/128');
```
Результат:
``` text
┌─isIPAddressInRange('::ffff:192.168.0.1', '::ffff:192.168.0.4/128')─┐
│ 0 │
└────────────────────────────────────────────────────────────────────┘
```

View File

@ -157,6 +157,7 @@ def build(args):
if not args.skip_website:
website.process_benchmark_results(args)
website.minify_website(args)
redirects.build_static_redirects(args)

View File

@ -1,7 +1,10 @@
import hashlib
import json
import logging
import os
import shutil
import subprocess
import bs4
import util
@ -178,6 +181,59 @@ def build_website(args):
f.write(content.encode("utf-8"))
def get_css_in(args):
return [
f"'{args.website_dir}/css/bootstrap.css'",
f"'{args.website_dir}/css/docsearch.css'",
f"'{args.website_dir}/css/base.css'",
f"'{args.website_dir}/css/blog.css'",
f"'{args.website_dir}/css/docs.css'",
f"'{args.website_dir}/css/highlight.css'",
f"'{args.website_dir}/css/main.css'",
]
def get_js_in(args):
return [
f"'{args.website_dir}/js/jquery.js'",
f"'{args.website_dir}/js/popper.js'",
f"'{args.website_dir}/js/bootstrap.js'",
f"'{args.website_dir}/js/sentry.js'",
f"'{args.website_dir}/js/base.js'",
f"'{args.website_dir}/js/index.js'",
f"'{args.website_dir}/js/docsearch.js'",
f"'{args.website_dir}/js/docs.js'",
f"'{args.website_dir}/js/main.js'",
]
def minify_website(args):
css_in = " ".join(get_css_in(args))
css_out = f"{args.output_dir}/docs/css/base.css"
os.makedirs(f"{args.output_dir}/docs/css")
command = f"cat {css_in}"
output = subprocess.check_output(command, shell=True)
with open(css_out, "wb+") as f:
f.write(output)
with open(css_out, "rb") as f:
css_digest = hashlib.sha3_224(f.read()).hexdigest()[0:8]
js_in = " ".join(get_js_in(args))
js_out = f"{args.output_dir}/docs/js/base.js"
os.makedirs(f"{args.output_dir}/docs/js")
command = f"cat {js_in}"
output = subprocess.check_output(command, shell=True)
with open(js_out, "wb+") as f:
f.write(output)
with open(js_out, "rb") as f:
js_digest = hashlib.sha3_224(f.read()).hexdigest()[0:8]
logging.info(js_digest)
def process_benchmark_results(args):
benchmark_root = os.path.join(args.website_dir, "benchmark")
required_keys = {

View File

@ -399,7 +399,7 @@ SELECT arrayPushFront(['b'], 'a') AS res
- `array` 数组。
- `offset` 数组的偏移。正值表示左侧的偏移量负值表示右侧的缩进值。数组下标从1开始。
- `length` - 子数组的长度。如果指定负值,则该函数返回`[offsetarray_length - length`。如果省略该值,则该函数返回`[offsetthe_end_of_array]`。
- `length` - 子数组的长度。如果指定负值,则该函数返回`[offsetarray_length - length]`。如果省略该值,则该函数返回`[offsetthe_end_of_array]`。
**示例**

View File

@ -4,6 +4,7 @@
#include <iostream>
#include <iomanip>
#include <optional>
#include <string_view>
#include <Common/scope_guard_safe.h>
#include <boost/program_options.hpp>
#include <boost/algorithm/string/replace.hpp>
@ -48,6 +49,7 @@
#endif
namespace fs = std::filesystem;
using namespace std::literals;
namespace DB
@ -1038,6 +1040,158 @@ void Client::processConfig()
client_info.quota_key = config().getString("quota_key", "");
}
void Client::readArguments(
int argc,
char ** argv,
Arguments & common_arguments,
std::vector<Arguments> & external_tables_arguments,
std::vector<Arguments> & hosts_and_ports_arguments)
{
/** We allow different groups of arguments:
* - common arguments;
* - arguments for any number of external tables each in form "--external args...",
* where possible args are file, name, format, structure, types;
* - param arguments for prepared statements.
* Split these groups before processing.
*/
bool in_external_group = false;
std::string prev_host_arg;
std::string prev_port_arg;
for (int arg_num = 1; arg_num < argc; ++arg_num)
{
std::string_view arg = argv[arg_num];
if (arg == "--external")
{
in_external_group = true;
external_tables_arguments.emplace_back(Arguments{""});
}
/// Options with value after equal sign.
else if (
in_external_group
&& (arg.starts_with("--file=") || arg.starts_with("--name=") || arg.starts_with("--format=") || arg.starts_with("--structure=")
|| arg.starts_with("--types=")))
{
external_tables_arguments.back().emplace_back(arg);
}
/// Options with value after whitespace.
else if (in_external_group && (arg == "--file" || arg == "--name" || arg == "--format" || arg == "--structure" || arg == "--types"))
{
if (arg_num + 1 < argc)
{
external_tables_arguments.back().emplace_back(arg);
++arg_num;
arg = argv[arg_num];
external_tables_arguments.back().emplace_back(arg);
}
else
break;
}
else
{
in_external_group = false;
if (arg == "--file"sv || arg == "--name"sv || arg == "--structure"sv || arg == "--types"sv)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter must be in external group, try add --external before {}", arg);
/// Parameter arg after underline.
if (arg.starts_with("--param_"))
{
auto param_continuation = arg.substr(strlen("--param_"));
auto equal_pos = param_continuation.find_first_of('=');
if (equal_pos == std::string::npos)
{
/// param_name value
++arg_num;
if (arg_num >= argc)
throw Exception("Parameter requires value", ErrorCodes::BAD_ARGUMENTS);
arg = argv[arg_num];
query_parameters.emplace(String(param_continuation), String(arg));
}
else
{
if (equal_pos == 0)
throw Exception("Parameter name cannot be empty", ErrorCodes::BAD_ARGUMENTS);
/// param_name=value
query_parameters.emplace(param_continuation.substr(0, equal_pos), param_continuation.substr(equal_pos + 1));
}
}
else if (arg.starts_with("--host") || arg.starts_with("-h"))
{
std::string host_arg;
/// --host host
if (arg == "--host" || arg == "-h")
{
++arg_num;
if (arg_num >= argc)
throw Exception("Host argument requires value", ErrorCodes::BAD_ARGUMENTS);
arg = argv[arg_num];
host_arg = "--host=";
host_arg.append(arg);
}
else
host_arg = arg;
/// --port port1 --host host1
if (!prev_port_arg.empty())
{
hosts_and_ports_arguments.push_back({host_arg, prev_port_arg});
prev_port_arg.clear();
}
else
{
/// --host host1 --host host2
if (!prev_host_arg.empty())
hosts_and_ports_arguments.push_back({prev_host_arg});
prev_host_arg = host_arg;
}
}
else if (arg.starts_with("--port"))
{
auto port_arg = String{arg};
/// --port port
if (arg == "--port")
{
port_arg.push_back('=');
++arg_num;
if (arg_num >= argc)
throw Exception("Port argument requires value", ErrorCodes::BAD_ARGUMENTS);
arg = argv[arg_num];
port_arg.append(arg);
}
/// --host host1 --port port1
if (!prev_host_arg.empty())
{
hosts_and_ports_arguments.push_back({port_arg, prev_host_arg});
prev_host_arg.clear();
}
else
{
/// --port port1 --port port2
if (!prev_port_arg.empty())
hosts_and_ports_arguments.push_back({prev_port_arg});
prev_port_arg = port_arg;
}
}
else if (arg == "--allow_repeated_settings")
allow_repeated_settings = true;
else
common_arguments.emplace_back(arg);
}
}
if (!prev_host_arg.empty())
hosts_and_ports_arguments.push_back({prev_host_arg});
if (!prev_port_arg.empty())
hosts_and_ports_arguments.push_back({prev_port_arg});
}
}

View File

@ -36,6 +36,13 @@ protected:
void processConfig() override;
void readArguments(
int argc,
char ** argv,
Arguments & common_arguments,
std::vector<Arguments> & external_tables_arguments,
std::vector<Arguments> & hosts_and_ports_arguments) override;
private:
void printChangedSettings() const;
std::vector<String> loadWarningMessages();

View File

@ -68,6 +68,7 @@ namespace ErrorCodes
extern const int NOT_ENOUGH_SPACE;
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_KILL;
extern const int BAD_ARGUMENTS;
}
}
@ -1062,8 +1063,11 @@ namespace
return pid;
}
int stop(const fs::path & pid_file, bool force)
int stop(const fs::path & pid_file, bool force, bool do_not_kill)
{
if (force && do_not_kill)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Specified flags are incompatible");
UInt64 pid = isRunning(pid_file);
if (!pid)
@ -1092,9 +1096,15 @@ namespace
if (try_num == num_tries)
{
fmt::print("Will terminate forcefully.\n", pid);
if (do_not_kill)
{
fmt::print("Process (pid = {}) is still running. Will not try to kill it.\n", pid);
return 1;
}
fmt::print("Will terminate forcefully (pid = {}).\n", pid);
if (0 == kill(pid, 9))
fmt::print("Sent kill signal.\n", pid);
fmt::print("Sent kill signal (pid = {}).\n", pid);
else
throwFromErrno("Cannot send kill signal", ErrorCodes::SYSTEM_ERROR);
@ -1175,6 +1185,7 @@ int mainEntryClickHouseStop(int argc, char ** argv)
("prefix", po::value<std::string>()->default_value("/"), "prefix for all paths")
("pid-path", po::value<std::string>()->default_value("var/run/clickhouse-server"), "directory for pid file")
("force", po::bool_switch(), "Stop with KILL signal instead of TERM")
("do-not-kill", po::bool_switch(), "Do not send KILL even if TERM did not help")
;
po::variables_map options;
@ -1189,7 +1200,9 @@ int mainEntryClickHouseStop(int argc, char ** argv)
fs::path prefix = options["prefix"].as<std::string>();
fs::path pid_file = prefix / options["pid-path"].as<std::string>() / "clickhouse-server.pid";
return stop(pid_file, options["force"].as<bool>());
bool force = options["force"].as<bool>();
bool do_not_kill = options["do-not-kill"].as<bool>();
return stop(pid_file, force, do_not_kill);
}
catch (...)
{
@ -1247,6 +1260,7 @@ int mainEntryClickHouseRestart(int argc, char ** argv)
("pid-path", po::value<std::string>()->default_value("var/run/clickhouse-server"), "directory for pid file")
("user", po::value<std::string>()->default_value(DEFAULT_CLICKHOUSE_SERVER_USER), "clickhouse user")
("force", po::value<bool>()->default_value(false), "Stop with KILL signal instead of TERM")
("do-not-kill", po::bool_switch(), "Do not send KILL even if TERM did not help")
;
po::variables_map options;
@ -1265,7 +1279,9 @@ int mainEntryClickHouseRestart(int argc, char ** argv)
fs::path config = prefix / options["config-path"].as<std::string>() / "config.xml";
fs::path pid_file = prefix / options["pid-path"].as<std::string>() / "clickhouse-server.pid";
if (int res = stop(pid_file, options["force"].as<bool>()))
bool force = options["force"].as<bool>();
bool do_not_kill = options["do-not-kill"].as<bool>();
if (int res = stop(pid_file, force, do_not_kill))
return res;
return start(user, executable, config, pid_file);

View File

@ -738,6 +738,15 @@ void LocalServer::processOptions(const OptionsDescription &, const CommandLineOp
config().setString("send_logs_level", options["send_logs_level"].as<std::string>());
}
void LocalServer::readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &, std::vector<Arguments> &)
{
for (int arg_num = 1; arg_num < argc; ++arg_num)
{
const char * arg = argv[arg_num];
common_arguments.emplace_back(arg);
}
}
}
#pragma GCC diagnostic ignored "-Wunused-function"

View File

@ -45,6 +45,8 @@ protected:
const std::vector<Arguments> &, const std::vector<Arguments> &) override;
void processConfig() override;
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &, std::vector<Arguments> &) override;
void updateLoggerLevel(const String & logs_level) override;

View File

@ -540,7 +540,7 @@ static void sanityChecks(Server & server)
try
{
if (readString("/sys/devices/system/clocksource/clocksource0/current_clocksource").find("tsc") == std::string::npos)
server.context()->addWarningMessage("Linux is not using fast TSC clock source. Performance can be degraded.");
server.context()->addWarningMessage("Linux is not using a fast TSC clock source. Performance can be degraded.");
}
catch (...)
{
@ -558,7 +558,7 @@ static void sanityChecks(Server & server)
try
{
if (readString("/sys/kernel/mm/transparent_hugepage/enabled").find("[always]") != std::string::npos)
server.context()->addWarningMessage("Linux transparent hugepage are set to \"always\".");
server.context()->addWarningMessage("Linux transparent hugepages are set to \"always\".");
}
catch (...)
{
@ -1088,11 +1088,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
if (config->has("global_memory_usage_overcommit_max_wait_microseconds"))
{
UInt64 max_overcommit_wait_time = config->getUInt64("global_memory_usage_overcommit_max_wait_microseconds", 0);
UInt64 max_overcommit_wait_time = config->getUInt64("global_memory_usage_overcommit_max_wait_microseconds", 200);
global_overcommit_tracker->setMaxWaitTime(max_overcommit_wait_time);
}
total_memory_tracker.setOvercommitTracker(global_overcommit_tracker);
// FIXME logging-related things need synchronization -- see the 'Logger * log' saved
@ -1294,17 +1291,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Listening for {}", server.getDescription());
}
auto & access_control = global_context->getAccessControl();
if (config().has("custom_settings_prefixes"))
access_control.setCustomSettingsPrefixes(config().getString("custom_settings_prefixes"));
access_control.setNoPasswordAllowed(config().getBool("allow_no_password", true));
access_control.setPlaintextPasswordAllowed(config().getBool("allow_plaintext_password", true));
/// Initialize access storages.
auto & access_control = global_context->getAccessControl();
try
{
access_control.addStoragesFromMainConfig(config(), config_path, [&] { return global_context->getZooKeeper(); });
access_control.setUpFromMainConfig(config(), config_path, [&] { return global_context->getZooKeeper(); });
}
catch (...)
{

View File

@ -545,6 +545,14 @@
-->
</user_directories>
<access_control_improvements>
<!-- Enables logic that users without permissive row policies can still read rows using a SELECT query.
For example, if there two users A, B and a row policy is defined only for A, then
if this setting is true the user B will see all rows, and if this setting is false the user B will see no rows.
By default this setting is false for compatibility with earlier access configurations. -->
<users_without_row_policies_can_read_rows>false</users_without_row_policies_can_read_rows>
</access_control_improvements>
<!-- Default profile of settings. -->
<default_profile>default</default_profile>

View File

@ -129,8 +129,8 @@
#query_div
{
/* Make enough space for even huge queries. */
height: 20%;
/* Make enough space for medium/large queries but allowing query textarea to grow. */
min-height: 20%;
}
#query
@ -748,7 +748,7 @@
const max_rows = 10000 / response.meta.length;
let row_num = 0;
const column_is_number = response.meta.map(elem => !!elem.type.match(/^(U?Int|Decimal|Float)/));
const column_is_number = response.meta.map(elem => !!elem.type.match(/^(Nullable\()?(U?Int|Decimal|Float)/));
const column_maximums = column_is_number.map((elem, idx) => elem ? Math.max(...response.data.map(row => row[idx])) : 0);
const column_minimums = column_is_number.map((elem, idx) => elem ? Math.min(...response.data.map(row => Math.max(0, row[idx]))) : 0);
const column_need_render_bars = column_is_number.map((elem, idx) => column_maximums[idx] > 0 && column_maximums[idx] > column_minimums[idx]);

View File

@ -6,9 +6,6 @@
<profiles>
<!-- Default settings. -->
<default>
<!-- Maximum memory usage for processing single query, in bytes. -->
<max_memory_usage>10000000000</max_memory_usage>
<!-- How to choose between replicas during distributed query processing.
random - choose random replica from set of replicas with minimum number of errors
nearest_hostname - from set of replicas with minimum number of errors, choose replica

View File

@ -149,6 +149,24 @@ AccessControl::AccessControl()
AccessControl::~AccessControl() = default;
void AccessControl::setUpFromMainConfig(const Poco::Util::AbstractConfiguration & config_, const String & config_path_,
const zkutil::GetZooKeeper & get_zookeeper_function_)
{
if (config_.has("custom_settings_prefixes"))
setCustomSettingsPrefixes(config_.getString("custom_settings_prefixes"));
setNoPasswordAllowed(config_.getBool("allow_no_password", true));
setPlaintextPasswordAllowed(config_.getBool("allow_plaintext_password", true));
setEnabledUsersWithoutRowPoliciesCanReadRows(config_.getBool(
"access_control_improvements.users_without_row_policies_can_read_rows",
false /* false because we need to be compatible with earlier access configurations */));
addStoragesFromMainConfig(config_, config_path_, get_zookeeper_function_);
}
void AccessControl::setUsersConfig(const Poco::Util::AbstractConfiguration & users_config_)
{
auto storages = getStoragesPtr();
@ -170,11 +188,7 @@ void AccessControl::addUsersConfigStorage(const Poco::Util::AbstractConfiguratio
void AccessControl::addUsersConfigStorage(const String & storage_name_, const Poco::Util::AbstractConfiguration & users_config_)
{
auto check_setting_name_function = [this](const std::string_view & setting_name) { checkSettingNameIsAllowed(setting_name); };
auto is_no_password_allowed_function = [this]() -> bool { return isNoPasswordAllowed(); };
auto is_plaintext_password_allowed_function = [this]() -> bool { return isPlaintextPasswordAllowed(); };
auto new_storage = std::make_shared<UsersConfigAccessStorage>(storage_name_, check_setting_name_function,
is_no_password_allowed_function, is_plaintext_password_allowed_function);
auto new_storage = std::make_shared<UsersConfigAccessStorage>(storage_name_, *this);
new_storage->setConfig(users_config_);
addStorage(new_storage);
LOG_DEBUG(getLogger(), "Added {} access storage '{}', path: {}",
@ -207,11 +221,7 @@ void AccessControl::addUsersConfigStorage(
return;
}
}
auto check_setting_name_function = [this](const std::string_view & setting_name) { checkSettingNameIsAllowed(setting_name); };
auto is_no_password_allowed_function = [this]() -> bool { return isNoPasswordAllowed(); };
auto is_plaintext_password_allowed_function = [this]() -> bool { return isPlaintextPasswordAllowed(); };
auto new_storage = std::make_shared<UsersConfigAccessStorage>(storage_name_, check_setting_name_function,
is_no_password_allowed_function, is_plaintext_password_allowed_function);
auto new_storage = std::make_shared<UsersConfigAccessStorage>(storage_name_, *this);
new_storage->load(users_config_path_, include_from_path_, preprocessed_dir_, get_zookeeper_function_);
addStorage(new_storage);
LOG_DEBUG(getLogger(), "Added {} access storage '{}', path: {}", String(new_storage->getStorageType()), new_storage->getStorageName(), new_storage->getPath());

View File

@ -50,6 +50,9 @@ public:
AccessControl();
~AccessControl() override;
void setUpFromMainConfig(const Poco::Util::AbstractConfiguration & config_, const String & config_path_,
const zkutil::GetZooKeeper & get_zookeeper_function_);
/// Parses access entities from a configuration loaded from users.xml.
/// This function add UsersConfigAccessStorage if it wasn't added before.
void setUsersConfig(const Poco::Util::AbstractConfiguration & users_config_);
@ -122,6 +125,12 @@ public:
void setPlaintextPasswordAllowed(const bool allow_plaintext_password_);
bool isPlaintextPasswordAllowed() const;
/// Enables logic that users without permissive row policies can still read rows using a SELECT query.
/// For example, if there two users A, B and a row policy is defined only for A, then
/// if this setting is true the user B will see all rows, and if this setting is false the user B will see no rows.
void setEnabledUsersWithoutRowPoliciesCanReadRows(bool enable) { users_without_row_policies_can_read_rows = enable; }
bool isEnabledUsersWithoutRowPoliciesCanReadRows() const { return users_without_row_policies_can_read_rows; }
UUID authenticate(const Credentials & credentials, const Poco::Net::IPAddress & address) const;
void setExternalAuthenticatorsConfig(const Poco::Util::AbstractConfiguration & config);
@ -178,6 +187,7 @@ private:
std::unique_ptr<CustomSettingsPrefixes> custom_settings_prefixes;
std::atomic_bool allow_plaintext_password = true;
std::atomic_bool allow_no_password = true;
std::atomic_bool users_without_row_policies_can_read_rows = false;
};
}

View File

@ -28,17 +28,25 @@ namespace
permissions.push_back(filter);
}
ASTPtr getResult() &&
ASTPtr getResult(bool users_without_row_policies_can_read_rows) &&
{
if (!permissions.empty() || !users_without_row_policies_can_read_rows)
{
/// Process permissive filters.
restrictions.push_back(makeASTForLogicalOr(std::move(permissions)));
}
/// Process restrictive filters.
auto result = makeASTForLogicalAnd(std::move(restrictions));
ASTPtr result;
if (!restrictions.empty())
result = makeASTForLogicalAnd(std::move(restrictions));
if (result)
{
bool value;
if (tryGetLiteralBool(result.get(), value) && value)
result = nullptr; /// The condition is always true, no need to check it.
}
return result;
}
@ -234,7 +242,7 @@ void RowPolicyCache::mixFiltersFor(EnabledRowPolicies & enabled)
{
auto & mixed_filter = (*mixed_filters)[key];
mixed_filter.database_and_table_name = mixer.database_and_table_name;
mixed_filter.ast = std::move(mixer.mixer).getResult();
mixed_filter.ast = std::move(mixer.mixer).getResult(access_control.isEnabledUsersWithoutRowPoliciesCanReadRows());
}
enabled.mixed_filters.store(mixed_filters);

View File

@ -3,6 +3,7 @@
#include <Access/RowPolicy.h>
#include <Access/User.h>
#include <Access/SettingsProfile.h>
#include <Access/AccessControl.h>
#include <Dictionaries/IDictionary.h>
#include <Common/Config/ConfigReloader.h>
#include <Common/StringUtils/StringUtils.h>
@ -339,7 +340,7 @@ namespace
}
std::vector<AccessEntityPtr> parseRowPolicies(const Poco::Util::AbstractConfiguration & config)
std::vector<AccessEntityPtr> parseRowPolicies(const Poco::Util::AbstractConfiguration & config, bool users_without_row_policies_can_read_rows)
{
std::map<std::pair<String /* database */, String /* table */>, std::unordered_map<String /* user */, String /* filter */>> all_filters_map;
@ -395,8 +396,19 @@ namespace
const auto & [database, table_name] = database_and_table_name;
for (const String & user_name : user_names)
{
String filter;
auto it = user_to_filters.find(user_name);
String filter = (it != user_to_filters.end()) ? it->second : "1";
if (it != user_to_filters.end())
{
filter = it->second;
}
else
{
if (users_without_row_policies_can_read_rows)
continue;
else
filter = "1";
}
auto policy = std::make_shared<RowPolicy>();
policy->setFullName(user_name, database, table_name);
@ -411,7 +423,7 @@ namespace
SettingsProfileElements parseSettingsConstraints(const Poco::Util::AbstractConfiguration & config,
const String & path_to_constraints,
Fn<void(std::string_view)> auto && check_setting_name_function)
const AccessControl & access_control)
{
SettingsProfileElements profile_elements;
Poco::Util::AbstractConfiguration::Keys keys;
@ -419,8 +431,7 @@ namespace
for (const String & setting_name : keys)
{
if (check_setting_name_function)
check_setting_name_function(setting_name);
access_control.checkSettingNameIsAllowed(setting_name);
SettingsProfileElement profile_element;
profile_element.setting_name = setting_name;
@ -448,7 +459,7 @@ namespace
std::shared_ptr<SettingsProfile> parseSettingsProfile(
const Poco::Util::AbstractConfiguration & config,
const String & profile_name,
Fn<void(std::string_view)> auto && check_setting_name_function)
const AccessControl & access_control)
{
auto profile = std::make_shared<SettingsProfile>();
profile->setName(profile_name);
@ -470,13 +481,12 @@ namespace
if (key == "constraints" || key.starts_with("constraints["))
{
profile->elements.merge(parseSettingsConstraints(config, profile_config + "." + key, check_setting_name_function));
profile->elements.merge(parseSettingsConstraints(config, profile_config + "." + key, access_control));
continue;
}
const auto & setting_name = key;
if (check_setting_name_function)
check_setting_name_function(setting_name);
access_control.checkSettingNameIsAllowed(setting_name);
SettingsProfileElement profile_element;
profile_element.setting_name = setting_name;
@ -490,7 +500,7 @@ namespace
std::vector<AccessEntityPtr> parseSettingsProfiles(
const Poco::Util::AbstractConfiguration & config,
Fn<void(std::string_view)> auto && check_setting_name_function)
const AccessControl & access_control)
{
Poco::Util::AbstractConfiguration::Keys profile_names;
config.keys("profiles", profile_names);
@ -502,7 +512,7 @@ namespace
{
try
{
profiles.push_back(parseSettingsProfile(config, profile_name, check_setting_name_function));
profiles.push_back(parseSettingsProfile(config, profile_name, access_control));
}
catch (Exception & e)
{
@ -515,13 +525,8 @@ namespace
}
}
UsersConfigAccessStorage::UsersConfigAccessStorage(const CheckSettingNameFunction & check_setting_name_function_, const IsNoPasswordFunction & is_no_password_allowed_function_, const IsPlaintextPasswordFunction & is_plaintext_password_allowed_function_)
: UsersConfigAccessStorage(STORAGE_TYPE, check_setting_name_function_, is_no_password_allowed_function_, is_plaintext_password_allowed_function_)
{
}
UsersConfigAccessStorage::UsersConfigAccessStorage(const String & storage_name_, const CheckSettingNameFunction & check_setting_name_function_, const IsNoPasswordFunction & is_no_password_allowed_function_, const IsPlaintextPasswordFunction & is_plaintext_password_allowed_function_)
: IAccessStorage(storage_name_), check_setting_name_function(check_setting_name_function_),is_no_password_allowed_function(is_no_password_allowed_function_), is_plaintext_password_allowed_function(is_plaintext_password_allowed_function_)
UsersConfigAccessStorage::UsersConfigAccessStorage(const String & storage_name_, const AccessControl & access_control_)
: IAccessStorage(storage_name_), access_control(access_control_)
{
}
@ -563,16 +568,16 @@ void UsersConfigAccessStorage::parseFromConfig(const Poco::Util::AbstractConfigu
{
try
{
bool no_password_allowed = is_no_password_allowed_function();
bool plaintext_password_allowed = is_plaintext_password_allowed_function();
bool no_password_allowed = access_control.isNoPasswordAllowed();
bool plaintext_password_allowed = access_control.isPlaintextPasswordAllowed();
std::vector<std::pair<UUID, AccessEntityPtr>> all_entities;
for (const auto & entity : parseUsers(config, no_password_allowed, plaintext_password_allowed))
all_entities.emplace_back(generateID(*entity), entity);
for (const auto & entity : parseQuotas(config))
all_entities.emplace_back(generateID(*entity), entity);
for (const auto & entity : parseRowPolicies(config))
for (const auto & entity : parseRowPolicies(config, access_control.isEnabledUsersWithoutRowPoliciesCanReadRows()))
all_entities.emplace_back(generateID(*entity), entity);
for (const auto & entity : parseSettingsProfiles(config, check_setting_name_function))
for (const auto & entity : parseSettingsProfiles(config, access_control))
all_entities.emplace_back(generateID(*entity), entity);
memory_storage.setAll(all_entities);
}

View File

@ -12,6 +12,7 @@ namespace Poco::Util
namespace DB
{
class AccessControl;
class ConfigReloader;
/// Implementation of IAccessStorage which loads all from users.xml periodically.
@ -20,13 +21,8 @@ class UsersConfigAccessStorage : public IAccessStorage
public:
static constexpr char STORAGE_TYPE[] = "users.xml";
using CheckSettingNameFunction = std::function<void(const std::string_view &)>;
using IsNoPasswordFunction = std::function<bool()>;
using IsPlaintextPasswordFunction = std::function<bool()>;
UsersConfigAccessStorage(const String & storage_name_ = STORAGE_TYPE, const CheckSettingNameFunction & check_setting_name_function_ = {}, const IsNoPasswordFunction & is_no_password_allowed_function_ ={}, const IsPlaintextPasswordFunction & is_plaintext_password_allowed_function_ = {}); /// NOLINT
UsersConfigAccessStorage(const CheckSettingNameFunction & check_setting_name_function_, const IsNoPasswordFunction & is_no_password_allowed_function_, const IsPlaintextPasswordFunction & is_plaintext_password_allowed_function_); /// NOLINT
UsersConfigAccessStorage(const String & storage_name_, const AccessControl & access_control_);
~UsersConfigAccessStorage() override;
const char * getStorageType() const override { return STORAGE_TYPE; }
@ -58,10 +54,8 @@ private:
scope_guard subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const override;
scope_guard subscribeForChangesImpl(AccessEntityType type, const OnChangedHandler & handler) const override;
const AccessControl & access_control;
MemoryAccessStorage memory_storage;
CheckSettingNameFunction check_setting_name_function;
IsNoPasswordFunction is_no_password_allowed_function;
IsPlaintextPasswordFunction is_plaintext_password_allowed_function;
String path;
std::unique_ptr<ConfigReloader> config_reloader;
mutable std::mutex load_mutex;

View File

@ -2,7 +2,6 @@
#include <iostream>
#include <iomanip>
#include <string_view>
#include <filesystem>
#include <map>
#include <unordered_map>
@ -392,7 +391,7 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
processed_rows += block.rows();
/// Even if all blocks are empty, we still need to initialize the output stream to write empty resultset.
initBlockOutputStream(block, parsed_query);
initOutputFormat(block, parsed_query);
/// The header block containing zero rows was used to initialize
/// output_format, do not output it.
@ -439,14 +438,14 @@ void ClientBase::onLogData(Block & block)
void ClientBase::onTotals(Block & block, ASTPtr parsed_query)
{
initBlockOutputStream(block, parsed_query);
initOutputFormat(block, parsed_query);
output_format->setTotals(block);
}
void ClientBase::onExtremes(Block & block, ASTPtr parsed_query)
{
initBlockOutputStream(block, parsed_query);
initOutputFormat(block, parsed_query);
output_format->setExtremes(block);
}
@ -466,7 +465,7 @@ void ClientBase::onProfileInfo(const ProfileInfo & profile_info)
}
void ClientBase::initBlockOutputStream(const Block & block, ASTPtr parsed_query)
void ClientBase::initOutputFormat(const Block & block, ASTPtr parsed_query)
try
{
if (!output_format)
@ -1487,7 +1486,9 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
if (is_interactive)
{
std::cout << std::endl << processed_rows << " rows in set. Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
std::cout << std::endl
<< processed_rows << " row" << (processed_rows == 1 ? "" : "s")
<< " in set. Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
progress_indication.writeFinalProgress();
std::cout << std::endl << std::endl;
}
@ -2057,156 +2058,6 @@ void ClientBase::showClientVersion()
}
void ClientBase::readArguments(
int argc,
char ** argv,
Arguments & common_arguments,
std::vector<Arguments> & external_tables_arguments,
std::vector<Arguments> & hosts_and_ports_arguments)
{
/** We allow different groups of arguments:
* - common arguments;
* - arguments for any number of external tables each in form "--external args...",
* where possible args are file, name, format, structure, types;
* - param arguments for prepared statements.
* Split these groups before processing.
*/
bool in_external_group = false;
std::string prev_host_arg;
std::string prev_port_arg;
for (int arg_num = 1; arg_num < argc; ++arg_num)
{
std::string_view arg = argv[arg_num];
if (arg == "--external")
{
in_external_group = true;
external_tables_arguments.emplace_back(Arguments{""});
}
/// Options with value after equal sign.
else if (
in_external_group
&& (arg.starts_with("--file=") || arg.starts_with("--name=") || arg.starts_with("--format=") || arg.starts_with("--structure=")
|| arg.starts_with("--types=")))
{
external_tables_arguments.back().emplace_back(arg);
}
/// Options with value after whitespace.
else if (in_external_group && (arg == "--file" || arg == "--name" || arg == "--format" || arg == "--structure" || arg == "--types"))
{
if (arg_num + 1 < argc)
{
external_tables_arguments.back().emplace_back(arg);
++arg_num;
arg = argv[arg_num];
external_tables_arguments.back().emplace_back(arg);
}
else
break;
}
else
{
in_external_group = false;
/// Parameter arg after underline.
if (arg.starts_with("--param_"))
{
auto param_continuation = arg.substr(strlen("--param_"));
auto equal_pos = param_continuation.find_first_of('=');
if (equal_pos == std::string::npos)
{
/// param_name value
++arg_num;
if (arg_num >= argc)
throw Exception("Parameter requires value", ErrorCodes::BAD_ARGUMENTS);
arg = argv[arg_num];
query_parameters.emplace(String(param_continuation), String(arg));
}
else
{
if (equal_pos == 0)
throw Exception("Parameter name cannot be empty", ErrorCodes::BAD_ARGUMENTS);
/// param_name=value
query_parameters.emplace(param_continuation.substr(0, equal_pos), param_continuation.substr(equal_pos + 1));
}
}
else if (arg.starts_with("--host") || arg.starts_with("-h"))
{
std::string host_arg;
/// --host host
if (arg == "--host" || arg == "-h")
{
++arg_num;
if (arg_num >= argc)
throw Exception("Host argument requires value", ErrorCodes::BAD_ARGUMENTS);
arg = argv[arg_num];
host_arg = "--host=";
host_arg.append(arg);
}
else
host_arg = arg;
/// --port port1 --host host1
if (!prev_port_arg.empty())
{
hosts_and_ports_arguments.push_back({host_arg, prev_port_arg});
prev_port_arg.clear();
}
else
{
/// --host host1 --host host2
if (!prev_host_arg.empty())
hosts_and_ports_arguments.push_back({prev_host_arg});
prev_host_arg = host_arg;
}
}
else if (arg.starts_with("--port"))
{
auto port_arg = String{arg};
/// --port port
if (arg == "--port")
{
port_arg.push_back('=');
++arg_num;
if (arg_num >= argc)
throw Exception("Port argument requires value", ErrorCodes::BAD_ARGUMENTS);
arg = argv[arg_num];
port_arg.append(arg);
}
/// --host host1 --port port1
if (!prev_host_arg.empty())
{
hosts_and_ports_arguments.push_back({port_arg, prev_host_arg});
prev_host_arg.clear();
}
else
{
/// --port port1 --port port2
if (!prev_port_arg.empty())
hosts_and_ports_arguments.push_back({prev_port_arg});
prev_port_arg = port_arg;
}
}
else if (arg == "--allow_repeated_settings")
allow_repeated_settings = true;
else
common_arguments.emplace_back(arg);
}
}
if (!prev_host_arg.empty())
hosts_and_ports_arguments.push_back({prev_host_arg});
if (!prev_port_arg.empty())
hosts_and_ports_arguments.push_back({prev_port_arg});
}
void ClientBase::parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
{
if (allow_repeated_settings)

View File

@ -106,6 +106,14 @@ protected:
bool processQueryText(const String & text);
virtual void readArguments(
int argc,
char ** argv,
Arguments & common_arguments,
std::vector<Arguments> & external_tables_arguments,
std::vector<Arguments> & hosts_and_ports_arguments) = 0;
private:
void receiveResult(ASTPtr parsed_query);
bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled_);
@ -131,19 +139,13 @@ private:
void sendDataFromStdin(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendExternalTables(ASTPtr parsed_query);
void initBlockOutputStream(const Block & block, ASTPtr parsed_query);
void initOutputFormat(const Block & block, ASTPtr parsed_query);
void initLogsOutputStream();
String prompt() const;
void resetOutput();
void outputQueryInfo(bool echo_query_);
void readArguments(
int argc,
char ** argv,
Arguments & common_arguments,
std::vector<Arguments> & external_tables_arguments,
std::vector<Arguments> & hosts_and_ports_arguments);
void parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments);
void updateSuggest(const ASTPtr & ast);

View File

@ -14,7 +14,7 @@
* make a persistent copy of the key in each of the following cases:
* 1) the aggregation method doesn't use temporary keys, so they're persistent
* from the start;
* 1) the key is already present in the hash table;
* 2) the key is already present in the hash table;
* 3) that particular key is stored by value, e.g. a short StringRef key in
* StringHashMap.
*

View File

@ -83,7 +83,7 @@ public:
current_word = 0;
}
void update(const char * data, UInt64 size)
ALWAYS_INLINE void update(const char * data, UInt64 size)
{
const char * end = data + size;
@ -137,12 +137,12 @@ public:
}
template <typename T>
void update(const T & x)
ALWAYS_INLINE void update(const T & x)
{
update(reinterpret_cast<const char *>(&x), sizeof(x)); /// NOLINT
}
void update(const std::string & x)
ALWAYS_INLINE void update(const std::string & x)
{
update(x.data(), x.length());
}

View File

@ -181,7 +181,7 @@ ThreadStatus::~ThreadStatus()
deleter();
/// Only change current_thread if it's currently being used by this ThreadStatus
/// For example, PushingToViewsBlockOutputStream creates and deletes ThreadStatus instances while running in the main query thread
/// For example, PushingToViews chain creates and deletes ThreadStatus instances while running in the main query thread
if (current_thread == this)
current_thread = nullptr;
}

View File

@ -46,7 +46,7 @@ struct BlockInfo
void read(ReadBuffer & in);
};
/// Block extension to support delayed defaults. AddingDefaultsBlockInputStream uses it to replace missing values with column defaults.
/// Block extension to support delayed defaults. AddingDefaultsTransform uses it to replace missing values with column defaults.
class BlockMissingValues
{
public:

View File

@ -22,6 +22,10 @@ namespace DB
{
class IColumn;
static constexpr UInt64 operator""_Gb(unsigned long long value)
{
return value * 1024 * 1024 * 1024;
}
/** List of settings: type, name, default value, description, flags
*
@ -356,9 +360,9 @@ class IColumn;
M(OverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
M(UInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.", 0) \
M(UInt64, max_guaranteed_memory_usage, 0, "Maximum guaranteed memory usage for processing of single query. It represents soft limit. Zero means unlimited.", 0) \
M(UInt64, max_guaranteed_memory_usage, 10_Gb, "Maximum guaranteed memory usage for processing of single query. It represents soft limit. Zero means unlimited.", 0) \
M(UInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.", 0) \
M(UInt64, max_guaranteed_memory_usage_for_user, 0, "Maximum guaranteed memory usage for processing all concurrently running queries for the user. It represents soft limit. Zero means unlimited.", 0) \
M(UInt64, max_guaranteed_memory_usage_for_user, 10_Gb, "Maximum guaranteed memory usage for processing all concurrently running queries for the user. It represents soft limit. Zero means unlimited.", 0) \
M(UInt64, max_untracked_memory, (4 * 1024 * 1024), "Small allocations and deallocations are grouped in thread local variable and tracked or profiled only when amount (in absolute value) becomes larger than specified value. If the value is higher than 'memory_profiler_step' it will be effectively lowered to 'memory_profiler_step'.", 0) \
M(UInt64, memory_profiler_step, (4 * 1024 * 1024), "Whenever query memory usage becomes larger than every next step in number of bytes the memory profiler will collect the allocating stack trace. Zero means disabled memory profiler. Values lower than a few megabytes will slow down query processing.", 0) \
M(Float, memory_profiler_sample_probability, 0., "Collect random allocations and deallocations and write them into system.trace_log with 'MemorySample' trace_type. The probability is for every alloc/free regardless to the size of the allocation. Note that sampling happens only when the amount of untracked memory exceeds 'max_untracked_memory'. You may want to set 'max_untracked_memory' to 0 for extra fine grained sampling.", 0) \

View File

@ -60,7 +60,7 @@ public:
* 1. \N
* 2. empty string (without quotes)
* 3. NULL
* We support all of them (however, second variant is supported by CSVRowInputStream, not by deserializeTextCSV).
* We support all of them (however, second variant is supported by CSVRowInputFormat, not by deserializeTextCSV).
* (see also input_format_defaults_for_omitted_fields and input_format_csv_unquoted_null_literal_as_null settings)
* In CSV, non-NULL string value, starting with \N characters, must be placed in quotes, to avoid ambiguity.
*/

View File

@ -104,6 +104,7 @@ bool DatabaseReplicatedDDLWorker::waitForReplicaToProcessAllEntries(UInt64 timeo
auto max_log = DDLTask::getLogEntryName(max_log_ptr);
LOG_TRACE(log, "Waiting for worker thread to process all entries before {}, current task is {}", max_log, current_task);
{
std::unique_lock lock{mutex};
bool processed = wait_current_task_change.wait_for(lock, std::chrono::milliseconds(timeout_ms), [&]()
{
@ -113,6 +114,7 @@ bool DatabaseReplicatedDDLWorker::waitForReplicaToProcessAllEntries(UInt64 timeo
if (!processed)
return false;
}
LOG_TRACE(log, "Waiting for worker thread to process all entries before {}, current task is {}", max_log, current_task);

View File

@ -217,7 +217,7 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
}
else
{
/// We pass empty block to RemoteBlockInputStream, because we don't know the structure of the result.
/// We pass empty block to RemoteQueryExecutor, because we don't know the structure of the result.
Block invalidate_sample_block;
QueryPipeline pipeline(std::make_shared<RemoteSource>(
std::make_shared<RemoteQueryExecutor>(pool, request, invalidate_sample_block, context_copy), false, false));

View File

@ -67,7 +67,7 @@ Columns DirectDictionary<dictionary_key_type>::getColumns(
size_t dictionary_keys_size = dict_struct.getKeysNames().size();
block_key_columns.reserve(dictionary_keys_size);
QueryPipeline pipeline(getSourceBlockInputStream(key_columns, requested_keys));
QueryPipeline pipeline(getSourcePipe(key_columns, requested_keys));
PullingPipelineExecutor executor(pipeline);
@ -185,7 +185,7 @@ ColumnUInt8::Ptr DirectDictionary<dictionary_key_type>::hasKeys(
size_t dictionary_keys_size = dict_struct.getKeysNames().size();
block_key_columns.reserve(dictionary_keys_size);
QueryPipeline pipeline(getSourceBlockInputStream(key_columns, requested_keys));
QueryPipeline pipeline(getSourcePipe(key_columns, requested_keys));
PullingPipelineExecutor executor(pipeline);
size_t keys_found = 0;
@ -259,7 +259,7 @@ ColumnUInt8::Ptr DirectDictionary<dictionary_key_type>::isInHierarchy(
}
template <DictionaryKeyType dictionary_key_type>
Pipe DirectDictionary<dictionary_key_type>::getSourceBlockInputStream(
Pipe DirectDictionary<dictionary_key_type>::getSourcePipe(
const Columns & key_columns [[maybe_unused]],
const PaddedPODArray<KeyType> & requested_keys [[maybe_unused]]) const
{

View File

@ -96,7 +96,7 @@ public:
Pipe read(const Names & column_names, size_t max_block_size, size_t num_streams) const override;
private:
Pipe getSourceBlockInputStream(const Columns & key_columns, const PaddedPODArray<KeyType> & requested_keys) const;
Pipe getSourcePipe(const Columns & key_columns, const PaddedPODArray<KeyType> & requested_keys) const;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;

View File

@ -697,7 +697,6 @@ void IPAddressDictionary::getItemsImpl(
const auto & first_column = key_columns.front();
const size_t rows = first_column->size();
// special case for getBlockInputStream
if (unlikely(key_columns.size() == 2))
{
getItemsByTwoKeyColumnsImpl<AttributeType>(

View File

@ -3,6 +3,7 @@
#include <Common/logger_useful.h>
#include <Common/escapeForFileName.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/ReadHelpers.h>

View File

@ -62,7 +62,7 @@ public:
/** Fast reading data from buffer and save result to memory.
* Reads at least min_chunk_bytes and some more until the end of the chunk, depends on the format.
* Used in ParallelParsingBlockInputStream.
* Used in ParallelParsingInputFormat.
*/
using FileSegmentationEngine = std::function<std::pair<bool, size_t>(
ReadBuffer & buf,

View File

@ -44,7 +44,7 @@ NativeReader::NativeReader(ReadBuffer & istr_, UInt64 server_revision_,
{
istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
if (!istr_concrete)
throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
throw Exception("When need to use index for NativeReader, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
if (index_block_it == index_block_end)
return;
@ -80,7 +80,7 @@ void NativeReader::readData(const ISerialization & serialization, ColumnPtr & co
if (column->size() != rows)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all data in NativeBlockInputStream. Rows read: {}. Rows expected: {}", column->size(), rows);
"Cannot read all data in NativeReader. Rows read: {}. Rows expected: {}", column->size(), rows);
}

View File

@ -35,7 +35,7 @@ NativeWriter::NativeWriter(
{
ostr_concrete = typeid_cast<CompressedWriteBuffer *>(&ostr);
if (!ostr_concrete)
throw Exception("When need to write index for NativeBlockOutputStream, ostr must be CompressedWriteBuffer.", ErrorCodes::LOGICAL_ERROR);
throw Exception("When need to write index for NativeWriter, ostr must be CompressedWriteBuffer.", ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -417,17 +417,17 @@ struct TimeWindowImpl<HOP>
{
ToType wstart = ToStartOfTransform<kind>::execute(time_data[i], hop_num_units, time_zone);
ToType wend = AddTime<kind>::execute(wstart, hop_num_units, time_zone);
wstart = AddTime<kind>::execute(wend, -1 * window_num_units, time_zone);
wstart = AddTime<kind>::execute(wend, -window_num_units, time_zone);
ToType wend_latest;
do
{
wend_latest = wend;
wend = AddTime<kind>::execute(wend, -1 * hop_num_units, time_zone);
wend = AddTime<kind>::execute(wend, -hop_num_units, time_zone);
} while (wend > time_data[i]);
end_data[i] = wend_latest;
start_data[i] = AddTime<kind>::execute(wend_latest, -1 * window_num_units, time_zone);
start_data[i] = AddTime<kind>::execute(wend_latest, -window_num_units, time_zone);
}
MutableColumns result;
result.emplace_back(std::move(start));
@ -570,7 +570,7 @@ struct TimeWindowImpl<WINDOW_ID>
do
{
wend_latest = wend;
wend = AddTime<kind>::execute(wend, -1 * gcd_num_units, time_zone);
wend = AddTime<kind>::execute(wend, -gcd_num_units, time_zone);
} while (wend > time_data[i]);
end_data[i] = wend_latest;

View File

@ -308,7 +308,7 @@ void NO_INLINE sliceFromRightConstantOffsetBounded(Source && src, Sink && sink,
{
ssize_t size = length;
if (size < 0)
size += static_cast<ssize_t>(src.getElementSize()) - offset;
size += offset;
if (size > 0)
writeSlice(src.getSliceFromRight(offset, size), sink);

View File

@ -55,7 +55,7 @@ public:
~BitReader() = default;
// reads bits_to_read high-bits from bits_buffer
inline UInt64 readBits(UInt8 bits_to_read)
ALWAYS_INLINE inline UInt64 readBits(UInt8 bits_to_read)
{
if (bits_to_read > bits_count)
fillBitBuffer();
@ -71,7 +71,7 @@ public:
return getBitsFromBitBuffer<PEEK>(8);
}
inline UInt8 readBit()
ALWAYS_INLINE inline UInt8 readBit()
{
return static_cast<UInt8>(readBits(1));
}
@ -122,7 +122,7 @@ private:
// Fills internal bits_buffer with data from source, reads at most 64 bits
size_t fillBitBuffer()
ALWAYS_INLINE size_t fillBitBuffer()
{
const size_t available = source_end - source_current;
const auto bytes_to_read = std::min<size_t>(64 / 8, available);

View File

@ -1194,9 +1194,18 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
if (inst->offsets)
inst->batch_that->addBatchSinglePlaceFromInterval(inst->offsets[row_begin], inst->offsets[row_end - 1], res + inst->state_offset, inst->batch_arguments, arena);
inst->batch_that->addBatchSinglePlaceFromInterval(
inst->offsets[static_cast<ssize_t>(row_begin) - 1],
inst->offsets[row_end - 1],
res + inst->state_offset,
inst->batch_arguments, arena);
else
inst->batch_that->addBatchSinglePlaceFromInterval(row_begin, row_end, res + inst->state_offset, inst->batch_arguments, arena);
inst->batch_that->addBatchSinglePlaceFromInterval(
row_begin,
row_end,
res + inst->state_offset,
inst->batch_arguments,
arena);
}
}

View File

@ -37,8 +37,8 @@ class IStreamFactory;
ContextMutablePtr updateSettingsForCluster(
const Cluster & cluster, ContextPtr context, const Settings & settings, Poco::Logger * log = nullptr);
/// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read.
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query
/// Execute a distributed query, creating a query plan, from which the query pipeline can be built.
/// `stream_factory` object encapsulates the logic of creating plans for a different type of query
/// (currently SELECT, DESCRIBE).
void executeQuery(
QueryPlan & query_plan,

View File

@ -383,7 +383,7 @@ ContextMutablePtr DatabaseReplicatedTask::makeQueryContext(ContextPtr from_conte
txn->addOp(zkutil::makeSetRequest(database->zookeeper_path + "/max_log_ptr", toString(getLogEntryNumber(entry_name)), -1));
}
txn->addOp(zkutil::makeSetRequest(database->replica_path + "/log_ptr", toString(getLogEntryNumber(entry_name)), -1));
txn->addOp(getOpToUpdateLogPointer());
for (auto & op : ops)
txn->addOp(std::move(op));
@ -392,6 +392,11 @@ ContextMutablePtr DatabaseReplicatedTask::makeQueryContext(ContextPtr from_conte
return query_context;
}
Coordination::RequestPtr DatabaseReplicatedTask::getOpToUpdateLogPointer()
{
return zkutil::makeSetRequest(database->replica_path + "/log_ptr", toString(getLogEntryNumber(entry_name)), -1);
}
String DDLTaskBase::getLogEntryName(UInt32 log_entry_number)
{
return zkutil::getSequentialNodeName("query-", log_entry_number);

View File

@ -107,6 +107,7 @@ struct DDLTaskBase
virtual String getShardID() const = 0;
virtual ContextMutablePtr makeQueryContext(ContextPtr from_context, const ZooKeeperPtr & zookeeper);
virtual Coordination::RequestPtr getOpToUpdateLogPointer() { return nullptr; }
inline String getActiveNodePath() const { return fs::path(entry_path) / "active" / host_id_str; }
inline String getFinishedNodePath() const { return fs::path(entry_path) / "finished" / host_id_str; }
@ -145,6 +146,7 @@ struct DatabaseReplicatedTask : public DDLTaskBase
String getShardID() const override;
void parseQueryFromEntry(ContextPtr context) override;
ContextMutablePtr makeQueryContext(ContextPtr from_context, const ZooKeeperPtr & zookeeper) override;
Coordination::RequestPtr getOpToUpdateLogPointer() override;
DatabaseReplicated * database;
};

View File

@ -180,7 +180,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
{
/// What should we do if we even cannot parse host name and therefore cannot properly submit execution status?
/// We can try to create fail node using FQDN if it equal to host name in cluster config attempt will be successful.
/// Otherwise, that node will be ignored by DDLQueryStatusInputStream.
/// Otherwise, that node will be ignored by DDLQueryStatusSource.
out_reason = "Incorrect task format";
write_error_status(host_fqdn_id, ExecutionStatus::fromCurrentException().serializeText(), out_reason);
return {};
@ -715,6 +715,8 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
if (zookeeper->exists(is_executed_path, nullptr, event))
{
LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path));
if (auto op = task.getOpToUpdateLogPointer())
task.ops.push_back(op);
return true;
}
@ -759,6 +761,8 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
{
LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, executed_by);
executed_by_other_leader = true;
if (auto op = task.getOpToUpdateLogPointer())
task.ops.push_back(op);
break;
}
@ -786,6 +790,8 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
{
LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path));
executed_by_other_leader = true;
if (auto op = task.getOpToUpdateLogPointer())
task.ops.push_back(op);
break;
}
else

View File

@ -91,7 +91,7 @@ protected:
/// Executes query only on leader replica in case of replicated table.
/// Queries like TRUNCATE/ALTER .../OPTIMIZE have to be executed only on one node of shard.
/// Most of these queries can be executed on non-leader replica, but actually they still send
/// query via RemoteBlockOutputStream to leader, so to avoid such "2-phase" query execution we
/// query via RemoteQueryExecutor to leader, so to avoid such "2-phase" query execution we
/// execute query directly on leader.
bool tryExecuteQueryOnLeaderReplica(
DDLTaskBase & task,

View File

@ -13,7 +13,7 @@ bool equals(const Field & lhs, const Field & rhs);
/** Helps to implement modifier WITH FILL for ORDER BY clause.
* Stores row as array of fields and provides functions to generate next row for filling gaps and for comparing rows.
* Used in FillingBlockInputStream and in FillingTransform.
* Used in FillingTransform.
*/
class FillingRow
{

View File

@ -2,6 +2,7 @@
#include <Access/Common/AccessFlags.h>
#include <Access/EnabledQuota.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Columns/ColumnNullable.h>
#include <Processors/Transforms/buildPushingToViewsChain.h>
#include <DataTypes/DataTypeNullable.h>
@ -153,7 +154,18 @@ Block InterpreterInsertQuery::getSampleBlock(
return res;
}
static bool hasAggregateFunctions(const IAST * ast)
{
if (const auto * func = typeid_cast<const ASTFunction *>(ast))
if (AggregateFunctionFactory::instance().isAggregateFunctionName(func->name))
return true;
for (const auto & child : ast->children)
if (hasAggregateFunctions(child.get()))
return true;
return false;
}
/** A query that just reads all data without any complex computations or filetering.
* If we just pipe the result to INSERT, we don't have to use too many threads for read.
*/
@ -186,7 +198,8 @@ static bool isTrivialSelect(const ASTPtr & select)
&& !select_query->groupBy()
&& !select_query->having()
&& !select_query->orderBy()
&& !select_query->limitBy());
&& !select_query->limitBy()
&& !hasAggregateFunctions(select_query));
}
/// This query is ASTSelectWithUnionQuery subquery
return false;
@ -396,7 +409,7 @@ BlockIO InterpreterInsertQuery::execute()
for (size_t col_idx = 0; col_idx < query_columns.size(); ++col_idx)
{
/// Change query sample block columns to Nullable to allow inserting nullable columns, where NULL values will be substituted with
/// default column values (in AddingDefaultBlockOutputStream), so all values will be cast correctly.
/// default column values (in AddingDefaultsTransform), so all values will be cast correctly.
if (input_columns[col_idx].type->isNullable() && !query_columns[col_idx].type->isNullable() && output_columns.hasDefault(query_columns[col_idx].name))
query_sample_block.setColumn(col_idx, ColumnWithTypeAndName(makeNullable(query_columns[col_idx].column), makeNullable(query_columns[col_idx].type), query_columns[col_idx].name));
}

View File

@ -144,7 +144,7 @@ bool VersionMetadata::isRemovalTIDLocked() const
void VersionMetadata::setCreationTID(const TransactionID & tid, TransactionInfoContext * context)
{
/// NOTE ReplicatedMergeTreeBlockOutputStream may add one part multiple times
/// NOTE ReplicatedMergeTreeSink may add one part multiple times
assert(creation_tid.isEmpty() || creation_tid == tid);
creation_tid = tid;
if (context)

View File

@ -275,7 +275,7 @@ DDLQueryStatusSource::DDLQueryStatusSource(
, node_path(zk_node_path)
, context(context_)
, watch(CLOCK_MONOTONIC_COARSE)
, log(&Poco::Logger::get("DDLQueryStatusInputStream"))
, log(&Poco::Logger::get("DDLQueryStatusSource"))
{
auto output_mode = context->getSettingsRef().distributed_ddl_output_mode;
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE;

View File

@ -821,7 +821,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
element.profile_counters = info.profile_counters;
/// We need to refresh the access info since dependent views might have added extra information, either during
/// creation of the view (PushingToViewsBlockOutputStream) or while executing its internal SELECT
/// creation of the view (PushingToViews chain) or while executing its internal SELECT
const auto & access_info = context_ptr->getQueryAccessInfo();
element.query_databases.insert(access_info.databases.begin(), access_info.databases.end());
element.query_tables.insert(access_info.tables.begin(), access_info.tables.end());

View File

@ -181,6 +181,10 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
else if (!disk.empty())
print_identifier(disk);
}
else if (type == Type::SYNC_DATABASE_REPLICA)
{
print_identifier(database->as<ASTIdentifier>()->name());
}
else if (type == Type::DROP_REPLICA)
{
print_drop_replica();

View File

@ -247,6 +247,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::SYNC_DATABASE_REPLICA:
{
parseQueryWithOnCluster(res, pos, expected);
if (!parseDatabaseAsAST(pos, expected, res->database))
return false;
break;

View File

@ -637,7 +637,7 @@ void SummingSortedAlgorithm::SummingMergedData::addRowImpl(ColumnRawPtrs & raw_c
for (auto & desc : def.columns_to_aggregate)
{
if (!desc.created)
throw Exception("Logical error in SummingSortedBlockInputStream, there are no description",
throw Exception("Logical error in SummingSortedAlgorithm, there are no description",
ErrorCodes::LOGICAL_ERROR);
if (desc.is_agg_func_type)

View File

@ -131,7 +131,7 @@ IProcessor::Status IMergingTransformBase::prepare()
return Status::Finished;
}
/// Do not disable inputs, so it will work in the same way as with AsynchronousBlockInputStream, like before.
/// Do not disable inputs, so they can be executed in parallel.
bool is_port_full = !output.canPush();
/// Push if has data.

View File

@ -170,8 +170,6 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
}
else
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params);

View File

@ -7,7 +7,7 @@ namespace DB
{
/** A stream of blocks from which you can read the next block from an explicitly provided list.
* Also see OneBlockInputStream.
* Also see SourceFromSingleChunk.
*/
class BlocksListSource : public SourceWithProgress
{

View File

@ -56,7 +56,7 @@ MySQLSource::MySQLSource(
const Block & sample_block,
const StreamSettings & settings_)
: SourceWithProgress(sample_block.cloneEmpty())
, log(&Poco::Logger::get("MySQLBlockInputStream"))
, log(&Poco::Logger::get("MySQLSource"))
, connection{std::make_unique<Connection>(entry, query_str)}
, settings{std::make_unique<StreamSettings>(settings_)}
{
@ -64,10 +64,10 @@ MySQLSource::MySQLSource(
initPositionMappingFromQueryResultStructure();
}
/// For descendant MySQLWithFailoverBlockInputStream
/// For descendant MySQLWithFailoverSource
MySQLSource::MySQLSource(const Block &sample_block_, const StreamSettings & settings_)
: SourceWithProgress(sample_block_.cloneEmpty())
, log(&Poco::Logger::get("MySQLBlockInputStream"))
, log(&Poco::Logger::get("MySQLSource"))
, settings(std::make_unique<StreamSettings>(settings_))
{
description.init(sample_block_);

View File

@ -148,8 +148,24 @@ void SourceWithProgress::progress(const Progress & value)
quota->used({QuotaType::READ_ROWS, value.read_rows}, {QuotaType::READ_BYTES, value.read_bytes});
}
auto query_kind = IAST::QueryKind::None;
if (process_list_elem)
query_kind = process_list_elem->getQueryKind();
if (query_kind == IAST::QueryKind::None || query_kind == IAST::QueryKind::System)
{
/// Don't increase profile event counters for merges and mutations, cause they use
/// a separate counter MergedRows/MergedBytes.
/// This is a bad way to check that a query is merge or mutation. Will fix it later.
/// Note: you can't just check for QueryKind::Select, cause there are
/// queries like CREATE AS SELECT or INSERT SELECT.
}
else
{
ProfileEvents::increment(ProfileEvents::SelectedRows, value.read_rows);
ProfileEvents::increment(ProfileEvents::SelectedBytes, value.read_bytes);
}
}
}

View File

@ -1,57 +0,0 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeTuple.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
class ReplacingWindowColumnTransform : public ISimpleTransform
{
public:
ReplacingWindowColumnTransform(
const Block & header_,
size_t window_id_column_pos_,
const NameAndTypePair & window_column_name_and_type_,
Tuple window_value_)
: ISimpleTransform(header_, getResultHeader(header_, window_id_column_pos_), false)
, window_id_column_pos(window_id_column_pos_)
, window_column_name_and_type(window_column_name_and_type_)
, window_value(window_value_)
{
replaced_window_id_column_pos = header_.getPositionByName(window_column_name_and_type.name);
}
String getName() const override { return "ReplacingWindowColumnTransform"; }
static Block getResultHeader(Block header, size_t window_id_column_pos_)
{
header.erase(window_id_column_pos_);
return header;
}
protected:
void transform(Chunk & chunk) override
{
auto window_column = window_column_name_and_type.type->createColumnConst(
chunk.getNumRows(), window_value)->convertToFullColumnIfConst();
chunk.erase(replaced_window_id_column_pos);
chunk.addColumn(replaced_window_id_column_pos, window_column);
chunk.erase(window_id_column_pos);
chunk.setChunkInfo(std::make_shared<AggregatedChunkInfo>());
}
size_t window_id_column_pos;
size_t replaced_window_id_column_pos;
NameAndTypePair window_column_name_and_type;
Tuple window_value;
};
}

View File

@ -188,8 +188,8 @@ Chain buildPushingToViewsChain(
auto storage_header = no_destination ? metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals())
: metadata_snapshot->getSampleBlock();
/** TODO This is a very important line. At any insertion into the table one of streams should own lock.
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
/** TODO This is a very important line. At any insertion into the table one of chains should own lock.
* Although now any insertion into the table is done via PushingToViews chain,
* but it's clear that here is not the best place for this functionality.
*/
result_chain.addTableLock(storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout));

View File

@ -83,7 +83,7 @@ static Block getEqualValuesBlockWithSize(
}
TEST(CheckSortedBlockInputStream, CheckGoodCase)
TEST(CheckSortedTransform, CheckGoodCase)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
@ -109,7 +109,7 @@ TEST(CheckSortedBlockInputStream, CheckGoodCase)
EXPECT_FALSE(executor.pull(chunk));
}
TEST(CheckSortedBlockInputStream, CheckBadLastRow)
TEST(CheckSortedTransform, CheckBadLastRow)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
@ -139,7 +139,7 @@ TEST(CheckSortedBlockInputStream, CheckBadLastRow)
}
TEST(CheckSortedBlockInputStream, CheckUnsortedBlock1)
TEST(CheckSortedTransform, CheckUnsortedBlock1)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
@ -163,7 +163,7 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock1)
#endif
}
TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2)
TEST(CheckSortedTransform, CheckUnsortedBlock2)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
@ -186,7 +186,7 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2)
#endif
}
TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3)
TEST(CheckSortedTransform, CheckUnsortedBlock3)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
@ -209,7 +209,7 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3)
#endif
}
TEST(CheckSortedBlockInputStream, CheckEqualBlock)
TEST(CheckSortedTransform, CheckEqualBlock)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);

View File

@ -627,7 +627,7 @@ namespace
void executeQuery();
void processInput();
void initializeBlockInputStream(const Block & header);
void initializePipeline(const Block & header);
void createExternalTables();
void generateOutput();
@ -920,7 +920,7 @@ namespace
if (context != query_context)
throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);
input_function_is_used = true;
initializeBlockInputStream(input_storage->getInMemoryMetadataPtr()->getSampleBlock());
initializePipeline(input_storage->getInMemoryMetadataPtr()->getSampleBlock());
});
query_context->setInputBlocksReaderCallback([this](ContextPtr context) -> Block
@ -967,7 +967,7 @@ namespace
/// This is significant, because parallel parsing may be used.
/// So we mustn't touch the input stream from other thread.
initializeBlockInputStream(io.pipeline.getHeader());
initializePipeline(io.pipeline.getHeader());
PushingPipelineExecutor executor(io.pipeline);
executor.start();
@ -982,7 +982,7 @@ namespace
executor.finish();
}
void Call::initializeBlockInputStream(const Block & header)
void Call::initializePipeline(const Block & header)
{
assert(!read_buffer);
read_buffer = std::make_unique<ReadBufferFromCallback>([this]() -> std::pair<const void *, size_t>

View File

@ -293,10 +293,11 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
}
HTTPHandler::HTTPHandler(IServer & server_, const std::string & name)
HTTPHandler::HTTPHandler(IServer & server_, const std::string & name, const std::optional<String> & content_type_override_)
: server(server_)
, log(&Poco::Logger::get(name))
, default_settings(server.context()->getSettingsRef())
, content_type_override(content_type_override_)
{
server_display_name = server.config().getString("display_name", getFQDNOrHostName());
}
@ -819,9 +820,9 @@ void HTTPHandler::processQuery(
customizeContext(request, context);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & current_query_id, const String & content_type, const String & format, const String & timezone)
[&response, this] (const String & current_query_id, const String & content_type, const String & format, const String & timezone)
{
response.setContentType(content_type);
response.setContentType(content_type_override.value_or(content_type));
response.add("X-ClickHouse-Query-Id", current_query_id);
response.add("X-ClickHouse-Format", format);
response.add("X-ClickHouse-Timezone", timezone);
@ -991,8 +992,8 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
used_output.finalize();
}
DynamicQueryHandler::DynamicQueryHandler(IServer & server_, const std::string & param_name_)
: HTTPHandler(server_, "DynamicQueryHandler"), param_name(param_name_)
DynamicQueryHandler::DynamicQueryHandler(IServer & server_, const std::string & param_name_, const std::optional<String>& content_type_override_)
: HTTPHandler(server_, "DynamicQueryHandler", content_type_override_), param_name(param_name_)
{
}
@ -1052,8 +1053,9 @@ PredefinedQueryHandler::PredefinedQueryHandler(
const NameSet & receive_params_,
const std::string & predefined_query_,
const CompiledRegexPtr & url_regex_,
const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_)
: HTTPHandler(server_, "PredefinedQueryHandler")
const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_,
const std::optional<String> & content_type_override_)
: HTTPHandler(server_, "PredefinedQueryHandler", content_type_override_)
, receive_params(receive_params_)
, predefined_query(predefined_query_)
, url_regex(url_regex_)
@ -1123,7 +1125,13 @@ std::string PredefinedQueryHandler::getQuery(HTTPServerRequest & request, HTMLFo
HTTPRequestHandlerFactoryPtr createDynamicHandlerFactory(IServer & server, const std::string & config_prefix)
{
auto query_param_name = server.config().getString(config_prefix + ".handler.query_param_name", "query");
auto factory = std::make_shared<HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>>(server, std::move(query_param_name));
std::optional<String> content_type_override;
if (server.config().has(config_prefix + ".handler.content_type"))
content_type_override = server.config().getString(config_prefix + ".handler.content_type");
auto factory = std::make_shared<HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>>(
server, std::move(query_param_name), std::move(content_type_override));
factory->addFiltersFromConfig(server.config(), config_prefix);
@ -1180,6 +1188,10 @@ HTTPRequestHandlerFactoryPtr createPredefinedHandlerFactory(IServer & server, co
headers_name_with_regex.emplace(std::make_pair(header_name, regex));
}
std::optional<String> content_type_override;
if (configuration.has(config_prefix + ".handler.content_type"))
content_type_override = configuration.getString(config_prefix + ".handler.content_type");
std::shared_ptr<HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>> factory;
if (configuration.has(config_prefix + ".url"))
@ -1197,14 +1209,20 @@ HTTPRequestHandlerFactoryPtr createPredefinedHandlerFactory(IServer & server, co
std::move(analyze_receive_params),
std::move(predefined_query),
std::move(regex),
std::move(headers_name_with_regex));
std::move(headers_name_with_regex),
std::move(content_type_override));
factory->addFiltersFromConfig(configuration, config_prefix);
return factory;
}
}
factory = std::make_shared<HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>>(
server, std::move(analyze_receive_params), std::move(predefined_query), CompiledRegexPtr{}, std::move(headers_name_with_regex));
server,
std::move(analyze_receive_params),
std::move(predefined_query),
CompiledRegexPtr{},
std::move(headers_name_with_regex),
std::move(content_type_override));
factory->addFiltersFromConfig(configuration, config_prefix);
return factory;

View File

@ -30,7 +30,7 @@ using CompiledRegexPtr = std::shared_ptr<const re2::RE2>;
class HTTPHandler : public HTTPRequestHandler
{
public:
HTTPHandler(IServer & server_, const std::string & name);
HTTPHandler(IServer & server_, const std::string & name, const std::optional<String> & content_type_override_);
virtual ~HTTPHandler() override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
@ -100,6 +100,9 @@ private:
/// See settings http_max_fields, http_max_field_name_size, http_max_field_value_size in HTMLForm.
const Settings & default_settings;
/// Overrides Content-Type provided by the format of the response.
std::optional<String> content_type_override;
// session is reset at the end of each request/response.
std::unique_ptr<Session> session;
@ -140,7 +143,7 @@ class DynamicQueryHandler : public HTTPHandler
private:
std::string param_name;
public:
explicit DynamicQueryHandler(IServer & server_, const std::string & param_name_ = "query");
explicit DynamicQueryHandler(IServer & server_, const std::string & param_name_ = "query", const std::optional<String>& content_type_override_ = std::nullopt);
std::string getQuery(HTTPServerRequest & request, HTMLForm & params, ContextMutablePtr context) override;
@ -157,7 +160,8 @@ private:
public:
PredefinedQueryHandler(
IServer & server_, const NameSet & receive_params_, const std::string & predefined_query_
, const CompiledRegexPtr & url_regex_, const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_);
, const CompiledRegexPtr & url_regex_, const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_
, const std::optional<std::string> & content_type_override_);
virtual void customizeContext(HTTPServerRequest & request, ContextMutablePtr context) override;

View File

@ -299,7 +299,7 @@ namespace
}
/// This is old format, that does not have header for the block in the file header,
/// applying ConvertingBlockInputStream in this case is not a big overhead.
/// applying ConvertingTransform in this case is not a big overhead.
///
/// Anyway we can get header only from the first block, which contain all rows anyway.
if (!distributed_header.block_header)

View File

@ -52,7 +52,7 @@ public:
static std::shared_ptr<ISource> createSourceFromFile(const String & file_name);
/// For scheduling via DistributedBlockOutputStream.
/// For scheduling via DistributedSink.
bool addAndSchedule(size_t file_size, size_t ms);
struct InternalStatus
@ -122,8 +122,6 @@ private:
CurrentMetrics::Increment metric_pending_files;
CurrentMetrics::Increment metric_broken_files;
friend class DirectoryMonitorBlockInputStream;
};
}

View File

@ -123,7 +123,7 @@ DistributedSink::DistributedSink(
, insert_timeout(insert_timeout_)
, main_table(main_table_)
, columns_to_send(columns_to_send_.begin(), columns_to_send_.end())
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
, log(&Poco::Logger::get("DistributedSink"))
{
const auto & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
@ -610,7 +610,7 @@ void DistributedSink::writeSplitAsync(const Block & block)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
OpenTelemetrySpanHolder span("DistributedBlockOutputStream::writeAsyncImpl()");
OpenTelemetrySpanHolder span("DistributedSink::writeAsyncImpl()");
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();

View File

@ -415,6 +415,11 @@ public:
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void onException() override
{
write_buf->finalize();
}
void onFinish() override
{
try

View File

@ -1,4 +1,4 @@
#include <Storages/Kafka/KafkaBlockOutputStream.h>
#include <Storages/Kafka/KafkaSink.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>

View File

@ -18,7 +18,7 @@
#include <Parsers/ASTLiteral.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/Kafka/KafkaBlockOutputStream.h>
#include <Storages/Kafka/KafkaSink.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/KafkaSource.h>
#include <Storages/Kafka/WriteBufferToKafkaProducer.h>

View File

@ -382,7 +382,7 @@ bool StorageLiveView::getNewBlocks()
/// can't set mergeable_blocks here or anywhere else outside the writeIntoLiveView function
/// as there could be a race codition when the new block has been inserted into
/// the source table by the PushingToViewsBlockOutputStream and this method
/// the source table by the PushingToViews chain and this method
/// called before writeIntoLiveView function is called which can lead to
/// the same block added twice to the mergeable_blocks leading to
/// inserted data to be duplicated

View File

@ -16,7 +16,7 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
size_t sum_rows = 0;
size_t sum_bytes_uncompressed = 0;
MergeTreeDataPartType future_part_type = MergeTreeDataPartType::UNKNOWN;
MergeTreeDataPartType future_part_type = MergeTreeDataPartType::Unknown;
for (const auto & part : parts_)
{
sum_rows += part->rows_count;

View File

@ -22,7 +22,7 @@ struct FutureMergedMutatedPart
MergeTreeDataPartType type;
MergeTreePartInfo part_info;
MergeTreeData::DataPartsVector parts;
MergeType merge_type = MergeType::REGULAR;
MergeType merge_type = MergeType::Regular;
const MergeTreePartition & getPartition() const { return parts.front()->partition; }

View File

@ -269,16 +269,16 @@ static void incrementTypeMetric(MergeTreeDataPartType type)
{
switch (type.getValue())
{
case MergeTreeDataPartType::WIDE:
case MergeTreeDataPartType::Wide:
CurrentMetrics::add(CurrentMetrics::PartsWide);
return;
case MergeTreeDataPartType::COMPACT:
case MergeTreeDataPartType::Compact:
CurrentMetrics::add(CurrentMetrics::PartsCompact);
return;
case MergeTreeDataPartType::IN_MEMORY:
case MergeTreeDataPartType::InMemory:
CurrentMetrics::add(CurrentMetrics::PartsInMemory);
return;
case MergeTreeDataPartType::UNKNOWN:
case MergeTreeDataPartType::Unknown:
return;
}
}
@ -287,16 +287,16 @@ static void decrementTypeMetric(MergeTreeDataPartType type)
{
switch (type.getValue())
{
case MergeTreeDataPartType::WIDE:
case MergeTreeDataPartType::Wide:
CurrentMetrics::sub(CurrentMetrics::PartsWide);
return;
case MergeTreeDataPartType::COMPACT:
case MergeTreeDataPartType::Compact:
CurrentMetrics::sub(CurrentMetrics::PartsCompact);
return;
case MergeTreeDataPartType::IN_MEMORY:
case MergeTreeDataPartType::InMemory:
CurrentMetrics::sub(CurrentMetrics::PartsInMemory);
return;
case MergeTreeDataPartType::UNKNOWN:
case MergeTreeDataPartType::Unknown:
return;
}
}
@ -1019,7 +1019,7 @@ void IMergeTreeDataPart::loadRowsCount()
{
rows_count = 0;
}
else if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || part_type == Type::COMPACT || parent_part)
else if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || part_type == Type::Compact || parent_part)
{
bool exists = metadata_manager->exists("count.txt");
if (!exists)
@ -1187,7 +1187,7 @@ void IMergeTreeDataPart::loadColumns(bool require)
if (!exists)
{
/// We can get list of columns only from columns.txt in compact parts.
if (require || part_type == Type::COMPACT)
if (require || part_type == Type::Compact)
throw Exception("No columns.txt in part " + name + ", expected path " + path + " on drive " + volume->getDisk()->getName(),
ErrorCodes::NO_FILE_IN_DATA_PART);
@ -2065,17 +2065,17 @@ std::unordered_map<String, IMergeTreeDataPart::uint128> IMergeTreeDataPart::chec
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT);
return (data_part && data_part->getType() == MergeTreeDataPartType::Compact);
}
bool isWidePart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::WIDE);
return (data_part && data_part->getType() == MergeTreeDataPartType::Wide);
}
bool isInMemoryPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::IN_MEMORY);
return (data_part && data_part->getType() == MergeTreeDataPartType::InMemory);
}
}

View File

@ -1,26 +0,0 @@
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
String toString(MergeAlgorithm merge_algorithm)
{
switch (merge_algorithm)
{
case MergeAlgorithm::Undecided:
return "Undecided";
case MergeAlgorithm::Horizontal:
return "Horizontal";
case MergeAlgorithm::Vertical:
return "Vertical";
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown MergeAlgorithm {}", static_cast<UInt64>(merge_algorithm));
}
}

View File

@ -12,6 +12,4 @@ enum class MergeAlgorithm
Vertical /// per-row merge of PK and secondary indices columns, per-column gather for non-PK columns
};
String toString(MergeAlgorithm merge_algorithm);
}

View File

@ -37,7 +37,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
};
}
if (entry.merge_type == MergeType::TTL_RECOMPRESS &&
if (entry.merge_type == MergeType::TTLRecompress &&
(time(nullptr) - entry.create_time) <= storage_settings_ptr->try_fetch_recompressed_part_timeout.totalSeconds() &&
entry.source_replica != storage.replica_name)
{

View File

@ -692,7 +692,7 @@ size_t MergeTreeBaseSelectProcessor::estimateMaxBatchSizeForHugeRanges()
size_t sum_average_marks_size = 0;
/// getColumnSize is not fully implemented for compact parts
if (task->data_part->getType() == IMergeTreeDataPart::Type::COMPACT)
if (task->data_part->getType() == IMergeTreeDataPart::Type::Compact)
{
sum_average_marks_size = average_granule_size_bytes;
}

View File

@ -30,7 +30,7 @@ NameSet injectRequiredColumns(
Names & columns);
/// A batch of work for MergeTreeThreadSelectBlockInputStream
/// A batch of work for MergeTreeThreadSelectProcessor
struct MergeTreeReadTask
{
/// data part which should be read while performing this task

View File

@ -1754,14 +1754,48 @@ size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
return parts_to_remove.size();
}
void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_remove)
void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts, bool throw_on_error, NameSet * parts_failed_to_delete)
{
NameSet part_names_successeded;
auto get_failed_parts = [&part_names_successeded, &parts_failed_to_delete, &parts] ()
{
if (part_names_successeded.size() == parts.size())
return;
if (parts_failed_to_delete)
{
for (const auto & part : parts)
{
if (!part_names_successeded.contains(part->name))
parts_failed_to_delete->insert(part->name);
}
}
};
try
{
clearPartsFromFilesystemImpl(parts, &part_names_successeded);
get_failed_parts();
}
catch (...)
{
get_failed_parts();
if (throw_on_error)
throw;
}
}
void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_to_remove, NameSet * part_names_successed)
{
const auto settings = getSettings();
if (parts_to_remove.size() > 1 && settings->max_part_removal_threads > 1 && parts_to_remove.size() > settings->concurrent_part_removal_threshold)
{
/// Parallel parts removal.
size_t num_threads = std::min<size_t>(settings->max_part_removal_threads, parts_to_remove.size());
std::mutex part_names_mutex;
ThreadPool pool(num_threads);
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
@ -1774,6 +1808,11 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->remove();
if (part_names_successed)
{
std::lock_guard lock(part_names_mutex);
part_names_successed->insert(part->name);
}
});
}
@ -1785,6 +1824,8 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
{
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->remove();
if (part_names_successed)
part_names_successed->insert(part->name);
}
}
}
@ -2402,27 +2443,27 @@ MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, s
{
const auto settings = getSettings();
if (!canUsePolymorphicParts(*settings))
return MergeTreeDataPartType::WIDE;
return MergeTreeDataPartType::Wide;
if (bytes_uncompressed < settings->min_bytes_for_compact_part || rows_count < settings->min_rows_for_compact_part)
return MergeTreeDataPartType::IN_MEMORY;
return MergeTreeDataPartType::InMemory;
if (bytes_uncompressed < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part)
return MergeTreeDataPartType::COMPACT;
return MergeTreeDataPartType::Compact;
return MergeTreeDataPartType::WIDE;
return MergeTreeDataPartType::Wide;
}
MergeTreeDataPartType MergeTreeData::choosePartTypeOnDisk(size_t bytes_uncompressed, size_t rows_count) const
{
const auto settings = getSettings();
if (!canUsePolymorphicParts(*settings))
return MergeTreeDataPartType::WIDE;
return MergeTreeDataPartType::Wide;
if (bytes_uncompressed < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part)
return MergeTreeDataPartType::COMPACT;
return MergeTreeDataPartType::Compact;
return MergeTreeDataPartType::WIDE;
return MergeTreeDataPartType::Wide;
}
@ -2430,11 +2471,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
const VolumePtr & volume, const String & relative_path, const IMergeTreeDataPart * parent_part) const
{
if (type == MergeTreeDataPartType::COMPACT)
if (type == MergeTreeDataPartType::Compact)
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, volume, relative_path, parent_part);
else if (type == MergeTreeDataPartType::WIDE)
else if (type == MergeTreeDataPartType::Wide)
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, volume, relative_path, parent_part);
else if (type == MergeTreeDataPartType::IN_MEMORY)
else if (type == MergeTreeDataPartType::InMemory)
return std::make_shared<MergeTreeDataPartInMemory>(*this, name, part_info, volume, relative_path, parent_part);
else
throw Exception("Unknown type of part " + relative_path, ErrorCodes::UNKNOWN_PART_TYPE);
@ -2443,11 +2484,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext)
{
if (mrk_ext == getNonAdaptiveMrkExtension())
return MergeTreeDataPartType::WIDE;
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::WIDE))
return MergeTreeDataPartType::WIDE;
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::COMPACT))
return MergeTreeDataPartType::COMPACT;
return MergeTreeDataPartType::Wide;
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide))
return MergeTreeDataPartType::Wide;
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact))
return MergeTreeDataPartType::Compact;
throw Exception("Can't determine part type, because of unknown mark extension " + mrk_ext, ErrorCodes::UNKNOWN_PART_TYPE);
}
@ -2989,7 +3030,7 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSetAn
/// We a going to remove active parts covered by drop_range without timeout.
/// Let's also reset timeout for inactive parts
/// and add these parts to list of parts to remove from ZooKeeper
auto inactive_parts_to_remove_immediately = getDataPartsVectorInPartitionForInternalUsage(DataPartState::Outdated, drop_range.partition_id, &lock);
auto inactive_parts_to_remove_immediately = getDataPartsVectorInPartitionForInternalUsage({DataPartState::Outdated, DataPartState::Deleting}, drop_range.partition_id, &lock);
/// FIXME refactor removePartsFromWorkingSet(...), do not remove parts twice
removePartsFromWorkingSet(txn, parts_to_remove, clear_without_timeout, lock);
@ -3461,6 +3502,19 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartiti
return getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id, acquired_lock);
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartitionForInternalUsage(const DataPartStates & affordable_states, const String & partition_id, DataPartsLock * acquired_lock) const
{
auto lock = (acquired_lock) ? DataPartsLock() : lockParts();
DataPartsVector res;
for (const auto & state : affordable_states)
{
DataPartStateAndPartitionID state_with_partition{state, partition_id};
res.insert(res.end(), data_parts_by_state_and_info.lower_bound(state_with_partition), data_parts_by_state_and_info.upper_bound(state_with_partition));
}
return res;
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartitionForInternalUsage(
const MergeTreeData::DataPartState & state, const String & partition_id, DataPartsLock * acquired_lock) const
{

View File

@ -110,7 +110,7 @@ namespace ErrorCodes
/// Several modes are implemented. Modes determine additional actions during merge:
/// - Ordinary - don't do anything special
/// - Collapsing - collapse pairs of rows with the opposite values of sign_columns for the same values
/// of primary key (cf. CollapsingSortedBlockInputStream.h)
/// of primary key (cf. CollapsingSortedTransform.h)
/// - Replacing - for all rows with the same primary key keep only the latest one. Or, if the version
/// column is set, keep the latest row with the maximal version.
/// - Summing - sum all numeric columns not contained in the primary key for all rows with the same primary key.
@ -497,6 +497,7 @@ public:
DataPartsVector getVisibleDataPartsVectorInPartitions(ContextPtr local_context, const std::unordered_set<String> & partition_ids) const;
DataPartsVector getDataPartsVectorInPartitionForInternalUsage(const DataPartState & state, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;
DataPartsVector getDataPartsVectorInPartitionForInternalUsage(const DataPartStates & affordable_states, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;
/// Returns the part with the given name and state or nullptr if no such part.
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states);
@ -615,7 +616,8 @@ public:
/// Delete irrelevant parts from memory and disk.
/// If 'force' - don't wait for old_parts_lifetime.
size_t clearOldPartsFromFilesystem(bool force = false);
void clearPartsFromFilesystem(const DataPartsVector & parts);
/// Try to clear parts from filesystem. Throw exception in case of errors.
void clearPartsFromFilesystem(const DataPartsVector & parts, bool throw_on_error = true, NameSet * parts_failed_to_delete = nullptr);
/// Delete WAL files containing parts, that all already stored on disk.
size_t clearOldWriteAheadLogs();
@ -1298,6 +1300,11 @@ private:
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
/// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings.
/// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part.
/// Otherwise, in non-parallel case will break and return.
void clearPartsFromFilesystemImpl(const DataPartsVector & parts, NameSet * part_names_successed);
TemporaryParts temporary_parts;
};

View File

@ -310,7 +310,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
{
future_part->merge_type = MergeType::TTL_DELETE;
future_part->merge_type = MergeType::TTLDelete;
}
else if (metadata_snapshot->hasAnyRecompressionTTL())
{
@ -322,7 +322,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
parts_to_merge = recompress_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
future_part->merge_type = MergeType::TTL_RECOMPRESS;
future_part->merge_type = MergeType::TTLRecompress;
}
}
@ -603,12 +603,12 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
* then we get here.
*
* When M > N parts could be replaced?
* - new block was added in ReplicatedMergeTreeBlockOutputStream;
* - new block was added in ReplicatedMergeTreeSink;
* - it was added to working dataset in memory and renamed on filesystem;
* - but ZooKeeper transaction that adds it to reference dataset in ZK failed;
* - and it is failed due to connection loss, so we don't rollback working dataset in memory,
* because we don't know if the part was added to ZK or not
* (see ReplicatedMergeTreeBlockOutputStream)
* (see ReplicatedMergeTreeSink)
* - then method selectPartsToMerge selects a range and sees, that EphemeralLock for the block in this part is unlocked,
* and so it is possible to merge a range skipping this part.
* (NOTE: Merging with part that is not in ZK is not possible, see checks in 'createLogEntryToMergeParts'.)

Some files were not shown because too many files have changed in this diff Show More