Merge branch 'master' into fix-incorrect-warning

This commit is contained in:
Alexey Milovidov 2023-07-07 13:44:21 +03:00 committed by GitHub
commit e303afbffb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
164 changed files with 1213 additions and 775 deletions

View File

@ -850,6 +850,48 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinRISCV64:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
BUILD_NAME=binary_riscv64
EOF
- name: Download changed images
uses: actions/download-artifact@v3
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
submodules: true
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v3
with:
name: ${{ env.BUILD_URLS }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
############################################################################################
##################################### Docker images #######################################
############################################################################################
@ -932,6 +974,7 @@ jobs:
- BuilderBinDarwinAarch64
- BuilderBinFreeBSD
- BuilderBinPPC64
- BuilderBinRISCV64
- BuilderBinAmd64Compat
- BuilderBinAarch64V80Compat
- BuilderBinClangTidy

View File

@ -911,6 +911,47 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinRISCV64:
needs: [DockerHubPush, FastTest, StyleCheck]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
BUILD_NAME=binary_riscv64
EOF
- name: Download changed images
uses: actions/download-artifact@v3
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
submodules: true
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v3
with:
name: ${{ env.BUILD_URLS }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
############################################################################################
##################################### Docker images #######################################
############################################################################################
@ -992,6 +1033,7 @@ jobs:
- BuilderBinDarwinAarch64
- BuilderBinFreeBSD
- BuilderBinPPC64
- BuilderBinRISCV64
- BuilderBinAmd64Compat
- BuilderBinAarch64V80Compat
- BuilderBinClangTidy

View File

@ -33,6 +33,19 @@ if (CMAKE_CROSSCOMPILING)
elseif (ARCH_PPC64LE)
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_SENTRY OFF CACHE INTERNAL "")
elseif (ARCH_RISCV64)
# RISC-V support is preliminary
set (GLIBC_COMPATIBILITY OFF CACHE INTERNAL "")
set (ENABLE_LDAP OFF CACHE INTERNAL "")
set (OPENSSL_NO_ASM ON CACHE INTERNAL "")
set (ENABLE_JEMALLOC ON CACHE INTERNAL "")
set (ENABLE_PARQUET OFF CACHE INTERNAL "")
set (USE_UNWIND OFF CACHE INTERNAL "")
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_HDFS OFF CACHE INTERNAL "")
set (ENABLE_MYSQL OFF CACHE INTERNAL "")
# It might be ok, but we need to update 'sysroot'
set (ENABLE_RUST OFF CACHE INTERNAL "")
elseif (ARCH_S390X)
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_SENTRY OFF CACHE INTERNAL "")

View File

@ -138,6 +138,7 @@ def parse_env_variables(
ARM_V80COMPAT_SUFFIX = "-aarch64-v80compat"
FREEBSD_SUFFIX = "-freebsd"
PPC_SUFFIX = "-ppc64le"
RISCV_SUFFIX = "-riscv64"
AMD64_COMPAT_SUFFIX = "-amd64-compat"
result = []
@ -150,6 +151,7 @@ def parse_env_variables(
is_cross_arm = compiler.endswith(ARM_SUFFIX)
is_cross_arm_v80compat = compiler.endswith(ARM_V80COMPAT_SUFFIX)
is_cross_ppc = compiler.endswith(PPC_SUFFIX)
is_cross_riscv = compiler.endswith(RISCV_SUFFIX)
is_cross_freebsd = compiler.endswith(FREEBSD_SUFFIX)
is_amd64_compat = compiler.endswith(AMD64_COMPAT_SUFFIX)
@ -206,6 +208,11 @@ def parse_env_variables(
cmake_flags.append(
"-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-ppc64le.cmake"
)
elif is_cross_riscv:
cc = compiler[: -len(RISCV_SUFFIX)]
cmake_flags.append(
"-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-riscv64.cmake"
)
elif is_amd64_compat:
cc = compiler[: -len(AMD64_COMPAT_SUFFIX)]
result.append("DEB_ARCH=amd64")
@ -370,6 +377,7 @@ def parse_args() -> argparse.Namespace:
"clang-16-aarch64",
"clang-16-aarch64-v80compat",
"clang-16-ppc64le",
"clang-16-riscv64",
"clang-16-amd64-compat",
"clang-16-freebsd",
),

View File

@ -67,6 +67,13 @@ start
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
# Start server from previous release
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
# Previous version may not be ready for fault injections
export ZOOKEEPER_FAULT_INJECTION=0
configure
# force_sync=false doesn't work correctly on some older versions
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
| sed "s|<force_sync>false</force_sync>|<force_sync>true</force_sync>|" \
@ -81,13 +88,6 @@ mv /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp /etc/cli
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
# Start server from previous release
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
# Previous version may not be ready for fault injections
export ZOOKEEPER_FAULT_INJECTION=0
configure
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml

View File

@ -94,7 +94,10 @@ RUN mkdir /tmp/ccache \
&& rm -rf /tmp/ccache
ARG TARGETARCH
ARG SCCACHE_VERSION=v0.4.1
ARG SCCACHE_VERSION=v0.5.4
ENV SCCACHE_IGNORE_SERVER_IO_ERROR=1
# sccache requires a value for the region. So by default we use The Default Region
ENV SCCACHE_REGION=us-east-1
RUN arch=${TARGETARCH:-amd64} \
&& case $arch in \
amd64) rarch=x86_64 ;; \

View File

@ -33,6 +33,9 @@ then
elif [ "${ARCH}" = "powerpc64le" -o "${ARCH}" = "ppc64le" ]
then
DIR="powerpc64le"
elif [ "${ARCH}" = "riscv64" ]
then
DIR="riscv64"
fi
elif [ "${OS}" = "FreeBSD" ]
then

View File

@ -3201,6 +3201,40 @@ ENGINE = Log
└──────────────────────────────────────────────────────────────────────────┘
```
## default_temporary_table_engine {#default_temporary_table_engine}
Same as [default_table_engine](#default_table_engine) but for temporary tables.
Default value: `Memory`.
In this example, any new temporary table that does not specify an `Engine` will use the `Log` table engine:
Query:
```sql
SET default_temporary_table_engine = 'Log';
CREATE TEMPORARY TABLE my_table (
x UInt32,
y UInt32
);
SHOW CREATE TEMPORARY TABLE my_table;
```
Result:
```response
┌─statement────────────────────────────────────────────────────────────────┐
│ CREATE TEMPORARY TABLE default.my_table
(
`x` UInt32,
`y` UInt32
)
ENGINE = Log
└──────────────────────────────────────────────────────────────────────────┘
```
## data_type_default_nullable {#data_type_default_nullable}
Allows data types without explicit modifiers [NULL or NOT NULL](../../sql-reference/statements/create/table.md/#null-modifiers) in column definition will be [Nullable](../../sql-reference/data-types/nullable.md/#data_type-nullable).

View File

@ -9,7 +9,6 @@ Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Event date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Event time.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Event time with microseconds resolution.
- `name` ([String](../../sql-reference/data-types/string.md)) — Metric name.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — Metric value.
@ -20,18 +19,18 @@ SELECT * FROM system.asynchronous_metric_log LIMIT 10
```
``` text
┌─event_date─┬──────────event_time─┬────event_time_microseconds─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴────────────────────────────┴──────────────────────────────────────────┴───────────┘
┌─event_date─┬──────────event_time─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴──────────────────────────────────────────┴───────────┘
```
**See Also**

View File

@ -8,7 +8,6 @@ slug: /ru/operations/system-tables/asynchronous_metric_log
Столбцы:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — дата события.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — время события.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — время события в микросекундах.
- `name` ([String](../../sql-reference/data-types/string.md)) — название метрики.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — значение метрики.

View File

@ -8,7 +8,6 @@ slug: /zh/operations/system-tables/asynchronous_metric_log
列:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — 事件日期。
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — 事件时间。
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — 事件时间(微秒)。
- `name` ([String](../../sql-reference/data-types/string.md)) — 指标名。
- `value` ([Float64](../../sql-reference/data-types/float.md)) — 指标值。
@ -17,18 +16,18 @@ slug: /zh/operations/system-tables/asynchronous_metric_log
SELECT * FROM system.asynchronous_metric_log LIMIT 10
```
``` text
┌─event_date─┬──────────event_time─┬────event_time_microseconds─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴────────────────────────────┴──────────────────────────────────────────┴───────────┘
┌─event_date─┬──────────event_time─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴──────────────────────────────────────────┴───────────┘
```
**另请参阅**

View File

@ -59,7 +59,7 @@ public:
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);
disk_from->copy(relative_path_from, disk_to, relative_path_to);
disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to);
}
};
}

View File

@ -42,7 +42,7 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
{
auto keeper_context = std::make_shared<KeeperContext>(true);
keeper_context->setDigestEnabled(true);
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>(), 0));
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>()));
DB::KeeperStorage storage(/* tick_time_ms */ 500, /* superdigest */ "", keeper_context, /* initialize_system_nodes */ false);

View File

@ -1590,6 +1590,15 @@ try
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
/// Build loggers before tables startup to make log messages from tables
/// attach available in system.text_log
{
String level_str = config().getString("text_log.level", "");
int level = level_str.empty() ? INT_MAX : Poco::Logger::parseLevel(level_str);
setTextLog(global_context->getTextLog(), level);
buildLoggers(config(), logger());
}
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper);
attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA));
@ -1716,14 +1725,6 @@ try
/// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread.
async_metrics.start();
{
String level_str = config().getString("text_log.level", "");
int level = level_str.empty() ? INT_MAX : Poco::Logger::parseLevel(level_str);
setTextLog(global_context->getTextLog(), level);
}
buildLoggers(config(), logger());
main_config_reloader->start();
access_control.startPeriodicReloading();

View File

@ -155,7 +155,7 @@ namespace
AccessRightsElement::AccessRightsElement(AccessFlags access_flags_, std::string_view database_)
: access_flags(access_flags_), database(database_), any_database(false)
: access_flags(access_flags_), database(database_), parameter(database_), any_database(false), any_parameter(false)
{
}

View File

@ -70,7 +70,7 @@ enum class AccessType
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
\
M(ALTER_DATABASE_SETTINGS, "ALTER DATABASE SETTING, ALTER MODIFY DATABASE SETTING, MODIFY DATABASE SETTING", DATABASE, ALTER_DATABASE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_CONTROL) /* allows to execute ALTER NAMED COLLECTION */\
M(ALTER_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute ALTER NAMED COLLECTION */\
\
M(ALTER_TABLE, "", GROUP, ALTER) \
M(ALTER_DATABASE, "", GROUP, ALTER) \
@ -92,7 +92,7 @@ enum class AccessType
M(CREATE_ARBITRARY_TEMPORARY_TABLE, "", GLOBAL, CREATE) /* allows to create and manipulate temporary tables
with arbitrary table engine */\
M(CREATE_FUNCTION, "", GLOBAL, CREATE) /* allows to execute CREATE FUNCTION */ \
M(CREATE_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_CONTROL) /* allows to execute CREATE NAMED COLLECTION */ \
M(CREATE_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute CREATE NAMED COLLECTION */ \
M(CREATE, "", GROUP, ALL) /* allows to execute {CREATE|ATTACH} */ \
\
M(DROP_DATABASE, "", DATABASE, DROP) /* allows to execute {DROP|DETACH} DATABASE */\
@ -101,7 +101,7 @@ enum class AccessType
implicitly enabled by the grant DROP_TABLE */\
M(DROP_DICTIONARY, "", DICTIONARY, DROP) /* allows to execute {DROP|DETACH} DICTIONARY */\
M(DROP_FUNCTION, "", GLOBAL, DROP) /* allows to execute DROP FUNCTION */\
M(DROP_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_CONTROL) /* allows to execute DROP NAMED COLLECTION */\
M(DROP_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute DROP NAMED COLLECTION */\
M(DROP, "", GROUP, ALL) /* allows to execute {DROP|DETACH} */\
\
M(UNDROP_TABLE, "", TABLE, ALL) /* allows to execute {UNDROP} TABLE */\
@ -140,9 +140,10 @@ enum class AccessType
M(SHOW_SETTINGS_PROFILES, "SHOW PROFILES, SHOW CREATE SETTINGS PROFILE, SHOW CREATE PROFILE", GLOBAL, SHOW_ACCESS) \
M(SHOW_ACCESS, "", GROUP, ACCESS_MANAGEMENT) \
M(ACCESS_MANAGEMENT, "", GROUP, ALL) \
M(SHOW_NAMED_COLLECTIONS, "SHOW NAMED COLLECTIONS", NAMED_COLLECTION, NAMED_COLLECTION_CONTROL) \
M(SHOW_NAMED_COLLECTIONS_SECRETS, "SHOW NAMED COLLECTIONS SECRETS", NAMED_COLLECTION, NAMED_COLLECTION_CONTROL) \
M(NAMED_COLLECTION_CONTROL, "", NAMED_COLLECTION, ALL) \
M(SHOW_NAMED_COLLECTIONS, "SHOW NAMED COLLECTIONS", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) \
M(SHOW_NAMED_COLLECTIONS_SECRETS, "SHOW NAMED COLLECTIONS SECRETS", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) \
M(NAMED_COLLECTION, "NAMED COLLECTION USAGE, USE NAMED COLLECTION", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) \
M(NAMED_COLLECTION_ADMIN, "NAMED COLLECTION CONTROL", NAMED_COLLECTION, ALL) \
\
M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN", GLOBAL, SYSTEM) \
M(SYSTEM_DROP_DNS_CACHE, "SYSTEM DROP DNS, DROP DNS CACHE, DROP DNS", GLOBAL, SYSTEM_DROP_CACHE) \

View File

@ -328,7 +328,7 @@ namespace
if (!named_collection_control)
{
user->access.revoke(AccessType::NAMED_COLLECTION_CONTROL);
user->access.revoke(AccessType::NAMED_COLLECTION_ADMIN);
}
if (!show_named_collections_secrets)

View File

@ -53,7 +53,7 @@ TEST(AccessRights, Union)
"SHOW ROW POLICIES, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, "
"SYSTEM MOVES, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, "
"SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, "
"SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON db1.*, GRANT NAMED COLLECTION CONTROL ON db1");
"SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON db1.*, GRANT NAMED COLLECTION ADMIN ON db1");
}

View File

@ -24,7 +24,7 @@ protected:
/// Make local disk.
temp_dir = std::make_unique<Poco::TemporaryFile>();
temp_dir->createDirectories();
local_disk = std::make_shared<DiskLocal>("local_disk", temp_dir->path() + "/", 0);
local_disk = std::make_shared<DiskLocal>("local_disk", temp_dir->path() + "/");
/// Make encrypted disk.
auto settings = std::make_unique<DiskEncryptedSettings>();
@ -38,7 +38,7 @@ protected:
settings->current_key = key;
settings->current_key_fingerprint = fingerprint;
encrypted_disk = std::make_shared<DiskEncrypted>("encrypted_disk", std::move(settings), true);
encrypted_disk = std::make_shared<DiskEncrypted>("encrypted_disk", std::move(settings));
}
void TearDown() override

View File

@ -107,8 +107,8 @@ struct FloatCompareHelper
}
};
template <class U> struct CompareHelper<Float32, U> : public FloatCompareHelper<Float32> {};
template <class U> struct CompareHelper<Float64, U> : public FloatCompareHelper<Float64> {};
template <typename U> struct CompareHelper<Float32, U> : public FloatCompareHelper<Float32> {};
template <typename U> struct CompareHelper<Float64, U> : public FloatCompareHelper<Float64> {};
/** A template for columns that use a simple array to store.

View File

@ -93,8 +93,8 @@
M(ThreadPoolFSReaderThreadsActive, "Number of threads in the thread pool for local_filesystem_read_method=threadpool running a task.") \
M(BackupsIOThreads, "Number of threads in the BackupsIO thread pool.") \
M(BackupsIOThreadsActive, "Number of threads in the BackupsIO thread pool running a task.") \
M(DiskObjectStorageAsyncThreads, "Number of threads in the async thread pool for DiskObjectStorage.") \
M(DiskObjectStorageAsyncThreadsActive, "Number of threads in the async thread pool for DiskObjectStorage running a task.") \
M(DiskObjectStorageAsyncThreads, "Obsolete metric, shows nothing.") \
M(DiskObjectStorageAsyncThreadsActive, "Obsolete metric, shows nothing.") \
M(StorageHiveThreads, "Number of threads in the StorageHive thread pool.") \
M(StorageHiveThreadsActive, "Number of threads in the StorageHive thread pool running a task.") \
M(TablesLoaderThreads, "Number of threads in the tables loader thread pool.") \
@ -141,6 +141,8 @@
M(MergeTreeOutdatedPartsLoaderThreadsActive, "Number of active threads in the threadpool for loading Outdated data parts.") \
M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \
M(MergeTreePartsCleanerThreadsActive, "Number of threads in the MergeTree parts cleaner thread pool running a task.") \
M(IDiskCopierThreads, "Number of threads for copying data between disks of different types.") \
M(IDiskCopierThreadsActive, "Number of threads for copying data between disks of different types running a task.") \
M(SystemReplicasThreads, "Number of threads in the system.replicas thread pool.") \
M(SystemReplicasThreadsActive, "Number of threads in the system.replicas thread pool running a task.") \
M(RestartReplicaThreads, "Number of threads in the RESTART REPLICA thread pool.") \

View File

@ -418,6 +418,18 @@ PreformattedMessage getCurrentExceptionMessageAndPattern(bool with_stacktrace, b
<< " (version " << VERSION_STRING << VERSION_OFFICIAL << ")";
}
catch (...) {}
// #ifdef ABORT_ON_LOGICAL_ERROR
// try
// {
// throw;
// }
// catch (const std::logic_error &)
// {
// abortOnFailedAssertion(stream.str());
// }
// catch (...) {}
// #endif
}
catch (...)
{

View File

@ -220,7 +220,7 @@ KeeperContext::Storage KeeperContext::getLogsPathFromConfig(const Poco::Util::Ab
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalLogDisk", path, 0);
return std::make_shared<DiskLocal>("LocalLogDisk", path);
};
/// the most specialized path
@ -246,7 +246,7 @@ KeeperContext::Storage KeeperContext::getSnapshotsPathFromConfig(const Poco::Uti
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalSnapshotDisk", path, 0);
return std::make_shared<DiskLocal>("LocalSnapshotDisk", path);
};
/// the most specialized path
@ -272,7 +272,7 @@ KeeperContext::Storage KeeperContext::getStatePathFromConfig(const Poco::Util::A
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalStateFileDisk", path, 0);
return std::make_shared<DiskLocal>("LocalStateFileDisk", path);
};
if (config.has("keeper_server.state_storage_disk"))

View File

@ -71,16 +71,16 @@ protected:
DB::KeeperContextPtr keeper_context = std::make_shared<DB::KeeperContext>(true);
Poco::Logger * log{&Poco::Logger::get("CoordinationTest")};
void setLogDirectory(const std::string & path) { keeper_context->setLogDisk(std::make_shared<DB::DiskLocal>("LogDisk", path, 0)); }
void setLogDirectory(const std::string & path) { keeper_context->setLogDisk(std::make_shared<DB::DiskLocal>("LogDisk", path)); }
void setSnapshotDirectory(const std::string & path)
{
keeper_context->setSnapshotDisk(std::make_shared<DB::DiskLocal>("SnapshotDisk", path, 0));
keeper_context->setSnapshotDisk(std::make_shared<DB::DiskLocal>("SnapshotDisk", path));
}
void setStateFileDirectory(const std::string & path)
{
keeper_context->setStateFileDisk(std::make_shared<DB::DiskLocal>("StateFile", path, 0));
keeper_context->setStateFileDisk(std::make_shared<DB::DiskLocal>("StateFile", path));
}
};
@ -1503,9 +1503,9 @@ void testLogAndStateMachine(
using namespace DB;
ChangelogDirTest snapshots("./snapshots");
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("SnapshotDisk", "./snapshots", 0));
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("SnapshotDisk", "./snapshots"));
ChangelogDirTest logs("./logs");
keeper_context->setLogDisk(std::make_shared<DiskLocal>("LogDisk", "./logs", 0));
keeper_context->setLogDisk(std::make_shared<DiskLocal>("LogDisk", "./logs"));
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};

View File

@ -517,6 +517,7 @@ class IColumn;
M(Seconds, wait_for_window_view_fire_signal_timeout, 10, "Timeout for waiting for window view fire signal in event time processing", 0) \
M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\
M(DefaultTableEngine, default_temporary_table_engine, DefaultTableEngine::Memory, "Default table engine used when ENGINE is not set in CREATE TEMPORARY statement.",0) \
M(DefaultTableEngine, default_table_engine, DefaultTableEngine::None, "Default table engine used when ENGINE is not set in CREATE statement.",0) \
M(Bool, show_table_uuid_in_table_create_query_if_not_nil, false, "For tables in databases with Engine=Atomic show UUID of the table in its CREATE query.", 0) \
M(Bool, database_atomic_wait_for_drop_and_detach_synchronously, false, "When executing DROP or DETACH TABLE in Atomic database, wait for table data to be finally dropped or detached.", 0) \

View File

@ -217,7 +217,7 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
std::optional<Configuration> configuration;
std::string settings_config_prefix = config_prefix + ".clickhouse";
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix) : nullptr;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, global_context) : nullptr;
if (named_collection)
{

View File

@ -71,7 +71,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
MySQLSettings mysql_settings;
std::optional<MySQLDictionarySource::Configuration> dictionary_configuration;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix) : nullptr;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, global_context) : nullptr;
if (named_collection)
{
auto allowed_arguments{dictionary_allowed_keys};

View File

@ -285,19 +285,32 @@ private:
};
DiskEncrypted::DiskEncrypted(
const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_, bool use_fake_transaction_)
: DiskEncrypted(name_, parseDiskEncryptedSettings(name_, config_, config_prefix_, map_), use_fake_transaction_)
const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_)
: DiskEncrypted(name_, parseDiskEncryptedSettings(name_, config_, config_prefix_, map_), config_, config_prefix_)
{
}
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_, bool use_fake_transaction_)
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_,
const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_)
: IDisk(name_, config_, config_prefix_)
, delegate(settings_->wrapped_disk)
, encrypted_name(name_)
, disk_path(settings_->disk_path)
, disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)
, current_settings(std::move(settings_))
, use_fake_transaction(config_.getBool(config_prefix_ + ".use_fake_transaction", true))
{
delegate->createDirectories(disk_path);
}
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_)
: IDisk(name_)
, delegate(settings_->wrapped_disk)
, encrypted_name(name_)
, disk_path(settings_->disk_path)
, disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)
, current_settings(std::move(settings_))
, use_fake_transaction(use_fake_transaction_)
, use_fake_transaction(true)
{
delegate->createDirectories(disk_path);
}
@ -310,32 +323,6 @@ ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
return std::make_unique<DiskEncryptedReservation>(std::static_pointer_cast<DiskEncrypted>(shared_from_this()), std::move(reservation));
}
void DiskEncrypted::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
/// Check if we can copy the file without deciphering.
if (isSameDiskType(*this, *to_disk))
{
/// Disk type is the same, check if the key is the same too.
if (auto * to_disk_enc = typeid_cast<DiskEncrypted *>(to_disk.get()))
{
auto from_settings = current_settings.get();
auto to_settings = to_disk_enc->current_settings.get();
if (from_settings->all_keys == to_settings->all_keys)
{
/// Keys are the same so we can simply copy the encrypted file.
auto wrapped_from_path = wrappedPath(from_path);
auto to_delegate = to_disk_enc->delegate;
auto wrapped_to_path = to_disk_enc->wrappedPath(to_path);
delegate->copy(wrapped_from_path, to_delegate, wrapped_to_path);
return;
}
}
}
/// Copy the file through buffers with deciphering.
copyThroughBuffers(from_path, to_disk, to_path);
}
void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
{
@ -359,11 +346,8 @@ void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::sha
}
}
if (!to_disk->exists(to_dir))
to_disk->createDirectories(to_dir);
/// Copy the file through buffers with deciphering.
copyThroughBuffers(from_dir, to_disk, to_dir);
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir);
}
std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
@ -443,7 +427,7 @@ std::unordered_map<String, String> DiskEncrypted::getSerializedMetadata(const st
void DiskEncrypted::applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
ContextPtr /*context*/,
ContextPtr context,
const String & config_prefix,
const DisksMap & disk_map)
{
@ -455,6 +439,7 @@ void DiskEncrypted::applyNewSettings(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Сhanging disk path on the fly is not supported. Disk {}", name);
current_settings.set(std::move(new_settings));
IDisk::applyNewSettings(config, context, config_prefix, disk_map);
}
void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check)
@ -467,7 +452,7 @@ void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check)
const DisksMap & map) -> DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk = std::make_shared<DiskEncrypted>(name, config, config_prefix, map, config.getBool(config_prefix + ".use_fake_transaction", true));
DiskPtr disk = std::make_shared<DiskEncrypted>(name, config, config_prefix, map);
disk->startup(context, skip_access_check);
return disk;
};

View File

@ -21,8 +21,10 @@ class WriteBufferFromFileBase;
class DiskEncrypted : public IDisk
{
public:
DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_, bool use_fake_transaction_);
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_, bool use_fake_transaction_);
DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_);
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_,
const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_);
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_);
const String & getName() const override { return encrypted_name; }
const String & getPath() const override { return disk_absolute_path; }
@ -110,8 +112,6 @@ public:
delegate->listFiles(wrapped_path, file_names);
}
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(

View File

@ -417,29 +417,12 @@ bool inline isSameDiskType(const IDisk & one, const IDisk & another)
return typeid(one) == typeid(another);
}
void DiskLocal::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
if (isSameDiskType(*this, *to_disk))
{
fs::path to = fs::path(to_disk->getPath()) / to_path;
fs::path from = fs::path(disk_path) / from_path;
if (from_path.ends_with('/'))
from = from.parent_path();
if (fs::is_directory(from))
to /= from.filename();
fs::copy(from, to, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
}
else
copyThroughBuffers(from_path, to_disk, to_path, /* copy_root_dir */ true); /// Base implementation.
}
void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
{
if (isSameDiskType(*this, *to_disk))
fs::copy(from_dir, to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
fs::copy(fs::path(disk_path) / from_dir, fs::path(to_disk->getPath()) / to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
else
copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir */ false); /// Base implementation.
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir);
}
SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
@ -448,7 +431,7 @@ SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
}
void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &)
void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & disk_map)
{
String new_disk_path;
UInt64 new_keep_free_space_bytes;
@ -460,10 +443,13 @@ void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & confi
if (keep_free_space_bytes != new_keep_free_space_bytes)
keep_free_space_bytes = new_keep_free_space_bytes;
IDisk::applyNewSettings(config, context, config_prefix, disk_map);
}
DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: IDisk(name_)
DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_,
const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: IDisk(name_, config, config_prefix)
, disk_path(path_)
, keep_free_space_bytes(keep_free_space_bytes_)
, logger(&Poco::Logger::get("DiskLocal"))
@ -472,13 +458,24 @@ DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_fre
}
DiskLocal::DiskLocal(
const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context, UInt64 local_disk_check_period_ms)
: DiskLocal(name_, path_, keep_free_space_bytes_)
const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context,
const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: DiskLocal(name_, path_, keep_free_space_bytes_, config, config_prefix)
{
auto local_disk_check_period_ms = config.getUInt("local_disk_check_period_ms", 0);
if (local_disk_check_period_ms > 0)
disk_checker = std::make_unique<DiskLocalCheckThread>(this, context, local_disk_check_period_ms);
}
DiskLocal::DiskLocal(const String & name_, const String & path_)
: IDisk(name_)
, disk_path(path_)
, keep_free_space_bytes(0)
, logger(&Poco::Logger::get("DiskLocal"))
, data_source_description(getLocalDataSourceDescription(disk_path))
{
}
DataSourceDescription DiskLocal::getDataSourceDescription() const
{
return data_source_description;
@ -720,7 +717,7 @@ void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check)
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
std::shared_ptr<IDisk> disk
= std::make_shared<DiskLocal>(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0));
= std::make_shared<DiskLocal>(name, path, keep_free_space_bytes, context, config, config_prefix);
disk->startup(context, skip_access_check);
return disk;
};

View File

@ -19,13 +19,17 @@ public:
friend class DiskLocalCheckThread;
friend class DiskLocalReservation;
DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_);
DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_,
const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
DiskLocal(
const String & name_,
const String & path_,
UInt64 keep_free_space_bytes_,
ContextPtr context,
UInt64 local_disk_check_period_ms);
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix);
DiskLocal(const String & name_, const String & path_);
const String & getPath() const override { return disk_path; }
@ -63,8 +67,6 @@ public:
void replaceFile(const String & from_path, const String & to_path) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;
void listFiles(const String & path, std::vector<String> & file_names) const override;

View File

@ -53,7 +53,7 @@ void DiskSelector::initialize(const Poco::Util::AbstractConfiguration & config,
disks.emplace(
default_disk_name,
std::make_shared<DiskLocal>(
default_disk_name, context->getPath(), 0, context, config.getUInt("local_disk_check_period_ms", 0)));
default_disk_name, context->getPath(), 0, context, config, config_prefix));
}
is_initialized = true;

View File

@ -1,42 +0,0 @@
#pragma once
#include <future>
#include <functional>
namespace DB
{
/// Interface to run task asynchronously with possibility to wait for execution.
class Executor
{
public:
virtual ~Executor() = default;
virtual std::future<void> execute(std::function<void()> task) = 0;
};
/// Executes task synchronously in case when disk doesn't support async operations.
class SyncExecutor : public Executor
{
public:
SyncExecutor() = default;
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
try
{
task();
promise->set_value();
}
catch (...)
{
try
{
promise->set_exception(std::current_exception());
}
catch (...) { }
}
return promise->get_future();
}
};
}

View File

@ -1,5 +1,4 @@
#include "IDisk.h"
#include "Disks/Executor.h"
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/copyData.h>
@ -80,18 +79,33 @@ UInt128 IDisk::getEncryptedFileIV(const String &) const
using ResultsCollector = std::vector<std::future<void>>;
void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, Executor & exec, ResultsCollector & results, bool copy_root_dir, const WriteSettings & settings)
void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, ThreadPool & pool, ResultsCollector & results, bool copy_root_dir, const WriteSettings & settings)
{
if (from_disk.isFile(from_path))
{
auto result = exec.execute(
[&from_disk, from_path, &to_disk, to_path, &settings]()
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
pool.scheduleOrThrowOnError(
[&from_disk, from_path, &to_disk, to_path, &settings, promise, thread_group = CurrentThread::getGroup()]()
{
setThreadName("DiskCopier");
from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), settings);
try
{
SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached(););
if (thread_group)
CurrentThread::attachToGroup(thread_group);
from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), settings);
promise->set_value();
}
catch (...)
{
promise->set_exception(std::current_exception());
}
});
results.push_back(std::move(result));
results.push_back(std::move(future));
}
else
{
@ -104,13 +118,12 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
}
for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next())
asyncCopy(from_disk, it->path(), to_disk, dest, exec, results, true, settings);
asyncCopy(from_disk, it->path(), to_disk, dest, pool, results, true, settings);
}
}
void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir)
{
auto & exec = to_disk->getExecutor();
ResultsCollector results;
WriteSettings settings;
@ -118,17 +131,12 @@ void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<I
/// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage
settings.s3_allow_parallel_part_upload = false;
asyncCopy(*this, from_path, *to_disk, to_path, exec, results, copy_root_dir, settings);
asyncCopy(*this, from_path, *to_disk, to_path, copying_thread_pool, results, copy_root_dir, settings);
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
}
void IDisk::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
copyThroughBuffers(from_path, to_disk, to_path, true);
result.get(); /// May rethrow an exception
}
@ -137,7 +145,7 @@ void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<
if (!to_disk->exists(to_dir))
to_disk->createDirectories(to_dir);
copyThroughBuffers(from_dir, to_disk, to_dir, false);
copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir */ false);
}
void IDisk::truncateFile(const String &, size_t)
@ -233,4 +241,9 @@ catch (Exception & e)
throw;
}
void IDisk::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr /*context*/, const String & config_prefix, const DisksMap & /*map*/)
{
copying_thread_pool.setMaxThreads(config.getInt(config_prefix + ".thread_pool_size", 16));
}
}

View File

@ -6,7 +6,6 @@
#include <base/types.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Disks/Executor.h>
#include <Disks/DiskType.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
@ -35,6 +34,12 @@ namespace Poco
}
}
namespace CurrentMetrics
{
extern const Metric IDiskCopierThreads;
extern const Metric IDiskCopierThreadsActive;
}
namespace DB
{
@ -110,9 +115,15 @@ class IDisk : public Space
{
public:
/// Default constructor.
explicit IDisk(const String & name_, std::shared_ptr<Executor> executor_ = std::make_shared<SyncExecutor>())
IDisk(const String & name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: name(name_)
, executor(executor_)
, copying_thread_pool(CurrentMetrics::IDiskCopierThreads, CurrentMetrics::IDiskCopierThreadsActive, config.getUInt(config_prefix + ".thread_pool_size", 16))
{
}
explicit IDisk(const String & name_)
: name(name_)
, copying_thread_pool(CurrentMetrics::IDiskCopierThreads, CurrentMetrics::IDiskCopierThreadsActive, 16)
{
}
@ -181,9 +192,6 @@ public:
/// If a file with `to_path` path already exists, it will be replaced.
virtual void replaceFile(const String & from_path, const String & to_path) = 0;
/// Recursively copy data containing at `from_path` to `to_path` located at `to_disk`.
virtual void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path);
/// Recursively copy files from from_dir to to_dir. Create to_dir if not exists.
virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir);
@ -379,7 +387,7 @@ public:
virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const;
/// Applies new settings for disk in runtime.
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) {}
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map);
/// Quite leaky abstraction. Some disks can use additional disk to store
/// some parts of metadata. In general case we have only one disk itself and
@ -459,9 +467,6 @@ protected:
const String name;
/// Returns executor to perform asynchronous operations.
virtual Executor & getExecutor() { return *executor; }
/// Base implementation of the function copy().
/// It just opens two files, reads data by portions from the first file, and writes it to the second one.
/// A derived class may override copy() to provide a faster implementation.
@ -470,7 +475,7 @@ protected:
virtual void checkAccessImpl(const String & path);
private:
std::shared_ptr<Executor> executor;
ThreadPool copying_thread_pool;
bool is_custom_disk = false;
/// Check access to the disk.

View File

@ -31,9 +31,6 @@ void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context));
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, "");
std::shared_ptr<IDisk> azure_blob_storage_disk = std::make_shared<DiskObjectStorage>(
@ -42,8 +39,8 @@ void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access
"DiskAzureBlobStorage",
std::move(metadata_storage),
std::move(azure_object_storage),
send_metadata,
copy_thread_pool_size
config,
config_prefix
);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);

View File

@ -18,12 +18,6 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace CurrentMetrics
{
extern const Metric DiskObjectStorageAsyncThreads;
extern const Metric DiskObjectStorageAsyncThreadsActive;
}
namespace DB
{
@ -37,55 +31,6 @@ namespace ErrorCodes
extern const int DIRECTORY_DOESNT_EXIST;
}
namespace
{
/// Runs tasks asynchronously using thread pool.
class AsyncThreadPoolExecutor : public Executor
{
public:
AsyncThreadPoolExecutor(const String & name_, int thread_pool_size)
: name(name_)
, pool(CurrentMetrics::DiskObjectStorageAsyncThreads, CurrentMetrics::DiskObjectStorageAsyncThreadsActive, thread_pool_size)
{}
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
pool.scheduleOrThrowOnError(
[promise, task]()
{
try
{
task();
promise->set_value();
}
catch (...)
{
tryLogCurrentException("Failed to run async task");
try
{
promise->set_exception(std::current_exception());
}
catch (...) {}
}
});
return promise->get_future();
}
void setMaxThreads(size_t threads)
{
pool.setMaxThreads(threads);
}
private:
String name;
ThreadPool pool;
};
}
DiskTransactionPtr DiskObjectStorage::createTransaction()
{
@ -105,27 +50,20 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction()
send_metadata ? metadata_helper.get() : nullptr);
}
std::shared_ptr<Executor> DiskObjectStorage::getAsyncExecutor(const std::string & log_name, size_t size)
{
static auto reader = std::make_shared<AsyncThreadPoolExecutor>(log_name, size);
return reader;
}
DiskObjectStorage::DiskObjectStorage(
const String & name_,
const String & object_storage_root_path_,
const String & log_name,
MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_,
bool send_metadata_,
uint64_t thread_pool_size_)
: IDisk(name_, getAsyncExecutor(log_name, thread_pool_size_))
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
: IDisk(name_, config, config_prefix)
, object_storage_root_path(object_storage_root_path_)
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
, send_metadata(send_metadata_)
, threadpool_size(thread_pool_size_)
, send_metadata(config.getBool(config_prefix + ".send_metadata", false))
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}))
{}
@ -234,19 +172,23 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
transaction->commit();
}
void DiskObjectStorage::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
void DiskObjectStorage::copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,
const String & to_file_path,
const WriteSettings & settings)
{
/// It's the same object storage disk
if (this == to_disk.get())
if (this == &to_disk)
{
/// It may use s3-server-side copy
auto transaction = createObjectStorageTransaction();
transaction->copyFile(from_path, to_path);
transaction->copyFile(from_file_path, to_file_path);
transaction->commit();
}
else
{
IDisk::copy(from_path, to_disk, to_path);
/// Copy through buffers
IDisk::copyFile(from_file_path, to_disk, to_file_path, settings);
}
}
@ -519,14 +461,15 @@ bool DiskObjectStorage::isWriteOnce() const
DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
{
const auto config_prefix = "storage_configuration.disks." + name;
return std::make_shared<DiskObjectStorage>(
getName(),
object_storage_root_path,
getName(),
metadata_storage,
object_storage,
send_metadata,
threadpool_size);
Context::getGlobalContextInstance()->getConfigRef(),
config_prefix);
}
std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
@ -582,13 +525,12 @@ void DiskObjectStorage::writeFileUsingBlobWritingFunction(const String & path, W
}
void DiskObjectStorage::applyNewSettings(
const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &)
const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String & /*config_prefix*/, const DisksMap & disk_map)
{
/// FIXME we cannot use config_prefix that was passed through arguments because the disk may be wrapped with cache and we need another name
const auto config_prefix = "storage_configuration.disks." + name;
object_storage->applyNewSettings(config, config_prefix, context_);
if (AsyncThreadPoolExecutor * exec = dynamic_cast<AsyncThreadPoolExecutor *>(&getExecutor()))
exec->setMaxThreads(config.getInt(config_prefix + ".thread_pool_size", 16));
IDisk::applyNewSettings(config, context_, config_prefix, disk_map);
}
void DiskObjectStorage::restoreMetadataIfNeeded(

View File

@ -33,8 +33,8 @@ public:
const String & log_name,
MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_,
bool send_metadata_,
uint64_t thread_pool_size_);
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix);
/// Create fake transaction
DiskTransactionPtr createTransaction() override;
@ -152,7 +152,11 @@ public:
Strings getBlobPath(const String & path) const override;
void writeFileUsingBlobWritingFunction(const String & path, WriteMode mode, WriteBlobFunction && write_blob_function) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,
const String & to_file_path,
const WriteSettings & settings = {}) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;
@ -198,8 +202,6 @@ public:
NameSet getCacheLayersNames() const override;
#endif
static std::shared_ptr<Executor> getAsyncExecutor(const std::string & log_name, size_t size);
bool supportsStat() const override { return metadata_storage->supportsStat(); }
struct stat stat(const String & path) const override;
@ -225,7 +227,6 @@ private:
std::optional<UInt64> tryReserve(UInt64 bytes);
const bool send_metadata;
size_t threadpool_size;
std::unique_ptr<DiskObjectStorageRemoteMetadataRestoreHelper> metadata_helper;
};

View File

@ -25,7 +25,7 @@ std::pair<String, DiskPtr> prepareForLocalMetadata(
/// where the metadata files are stored locally
auto metadata_path = getDiskMetadataPath(name, config, config_prefix, context);
fs::create_directories(metadata_path);
auto metadata_disk = std::make_shared<DiskLocal>(name + "-metadata", metadata_path, 0);
auto metadata_disk = std::make_shared<DiskLocal>(name + "-metadata", metadata_path, 0, config, config_prefix);
return std::make_pair(metadata_path, metadata_disk);
}

View File

@ -8,6 +8,14 @@
#include <IO/WriteBufferFromFile.h>
#include <Common/checkStackSize.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
namespace DB
{
@ -101,7 +109,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema
updateObjectMetadata(object.remote_path, metadata);
}
}
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, ThreadPool & pool)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
@ -120,29 +128,26 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecu
/// The whole directory can be migrated asynchronously.
if (dir_contains_only_files)
{
auto result = disk->getExecutor().execute([this, path]
pool.scheduleOrThrowOnError([this, path]
{
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
migrateFileToRestorableSchema(it->path());
});
results.push_back(std::move(result));
}
else
{
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
if (!disk->isDirectory(it->path()))
{
if (disk->isDirectory(it->path()))
{
auto source_path = it->path();
auto result = disk->getExecutor().execute([this, source_path]
{
migrateFileToRestorableSchema(source_path);
});
results.push_back(std::move(result));
migrateToRestorableSchemaRecursive(it->path(), pool);
}
else
migrateToRestorableSchemaRecursive(it->path(), results);
{
auto source_path = it->path();
pool.scheduleOrThrowOnError([this, source_path] { migrateFileToRestorableSchema(source_path); });
}
}
}
}
@ -153,16 +158,13 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchema()
{
LOG_INFO(disk->log, "Start migration to restorable schema for disk {}", disk->name);
Futures results;
ThreadPool pool{CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive};
for (const auto & root : data_roots)
if (disk->exists(root))
migrateToRestorableSchemaRecursive(root + '/', results);
migrateToRestorableSchemaRecursive(root + '/', pool);
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
pool.wait();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
}
@ -355,8 +357,8 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
{
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
std::vector<std::future<void>> results;
auto restore_files = [this, &source_object_storage, &restore_information, &results](const RelativePathsWithMetadata & objects)
ThreadPool pool{CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive};
auto restore_files = [this, &source_object_storage, &restore_information, &pool](const RelativePathsWithMetadata & objects)
{
std::vector<String> keys_names;
for (const auto & object : objects)
@ -378,12 +380,10 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
if (!keys_names.empty())
{
auto result = disk->getExecutor().execute([this, &source_object_storage, &restore_information, keys_names]()
pool.scheduleOrThrowOnError([this, &source_object_storage, &restore_information, keys_names]()
{
processRestoreFiles(source_object_storage, restore_information.source_path, keys_names);
});
results.push_back(std::move(result));
}
return true;
@ -394,10 +394,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
restore_files(children);
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
pool.wait();
LOG_INFO(disk->log, "Files are restored for disk {}", disk->name);

View File

@ -75,7 +75,7 @@ private:
void saveSchemaVersion(const int & version) const;
void updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const;
void migrateFileToRestorableSchema(const String & path) const;
void migrateToRestorableSchemaRecursive(const String & path, Futures & results);
void migrateToRestorableSchemaRecursive(const String & path, ThreadPool & pool);
void readRestoreInformation(RestoreInformation & restore_information);
void restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information);

View File

@ -44,7 +44,6 @@ void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check)
auto [_, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri);
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk = std::make_shared<DiskObjectStorage>(
@ -53,8 +52,8 @@ void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check)
"DiskHDFS",
std::move(metadata_storage),
std::move(hdfs_storage),
/* send_metadata = */ false,
copy_thread_pool_size);
config,
config_prefix);
disk->startup(context, skip_access_check);
return disk;

View File

@ -34,7 +34,7 @@ void registerDiskLocalObjectStorage(DiskFactory & factory, bool global_skip_acce
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, path);
auto disk = std::make_shared<DiskObjectStorage>(
name, path, "Local", metadata_storage, local_storage, false, /* threadpool_size */16);
name, path, "Local", metadata_storage, local_storage, config, config_prefix);
disk->startup(context, global_skip_access_check);
return disk;
};

View File

@ -150,17 +150,14 @@ void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
}
}
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
DiskObjectStoragePtr s3disk = std::make_shared<DiskObjectStorage>(
name,
uri.key,
type == "s3" ? "DiskS3" : "DiskS3Plain",
std::move(metadata_storage),
std::move(s3_storage),
send_metadata,
copy_thread_pool_size);
config,
config_prefix);
s3disk->startup(context, skip_access_check);

View File

@ -52,8 +52,8 @@ void registerDiskWebServer(DiskFactory & factory, bool global_skip_access_check)
"DiskWebServer",
metadata_storage,
object_storage,
/* send_metadata */false,
/* threadpool_size */16);
config,
config_prefix);
disk->startup(context, skip_access_check);
return disk;
};

View File

@ -302,7 +302,11 @@ void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_pol
for (const auto & volume : getVolumes())
{
if (!new_volume_names.contains(volume->getName()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "New storage policy {} shall contain volumes of old one", backQuote(name));
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"New storage policy {} shall contain volumes of the old storage policy {}",
backQuote(new_storage_policy->getName()),
backQuote(name));
std::unordered_set<String> new_disk_names;
for (const auto & disk : new_storage_policy->getVolumeByName(volume->getName())->getDisks())
@ -310,7 +314,11 @@ void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_pol
for (const auto & disk : volume->getDisks())
if (!new_disk_names.contains(disk->getName()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "New storage policy {} shall contain disks of old one", backQuote(name));
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"New storage policy {} shall contain disks of the old storage policy {}",
backQuote(new_storage_policy->getName()),
backQuote(name));
}
}

View File

@ -56,7 +56,7 @@ void loadDiskLocalConfig(const String & name,
tmp_path = context->getPath();
// Create tmp disk for getting total disk space.
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio);
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0, config, config_prefix).getTotalSpace() * ratio);
}
}

View File

@ -33,7 +33,7 @@ public:
void SetUp() override
{
fs::create_directories(tmp_root);
disk = std::make_shared<DB::DiskLocal>("local_disk", tmp_root, 0);
disk = std::make_shared<DB::DiskLocal>("local_disk", tmp_root);
}
void TearDown() override

View File

@ -10,7 +10,7 @@ namespace fs = std::filesystem;
DB::DiskPtr createDisk()
{
fs::create_directory("tmp/");
return std::make_shared<DB::DiskLocal>("local_disk", "tmp/", 0);
return std::make_shared<DB::DiskLocal>("local_disk", "tmp/");
}
void destroyDisk(DB::DiskPtr & disk)

View File

@ -23,7 +23,7 @@ protected:
/// Make local disk.
temp_dir = std::make_unique<Poco::TemporaryFile>();
temp_dir->createDirectories();
local_disk = std::make_shared<DiskLocal>("local_disk", getDirectory(), 0);
local_disk = std::make_shared<DiskLocal>("local_disk", getDirectory());
}
void TearDown() override
@ -42,7 +42,7 @@ protected:
settings->current_key = key;
settings->current_key_fingerprint = fingerprint;
settings->disk_path = path;
encrypted_disk = std::make_shared<DiskEncrypted>("encrypted_disk", std::move(settings), true);
encrypted_disk = std::make_shared<DiskEncrypted>("encrypted_disk", std::move(settings));
}
String getFileNames()

View File

@ -81,7 +81,7 @@ namespace impl
static SipHashKey parseSipHashKey(const ColumnWithTypeAndName & key)
{
SipHashKey ret;
SipHashKey ret{};
const auto * tuple = checkAndGetColumn<ColumnTuple>(key.column.get());
if (!tuple)
@ -90,6 +90,9 @@ namespace impl
if (tuple->tupleSize() != 2)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "wrong tuple size: key must be a tuple of 2 UInt64");
if (tuple->empty())
return ret;
if (const auto * key0col = checkAndGetColumn<ColumnUInt64>(&(tuple->getColumn(0))))
ret.key0 = key0col->get64(0);
else

View File

@ -698,6 +698,8 @@ namespace
const DataTypePtr & from_type = arguments[0].type;
std::lock_guard lock(cache.mutex);
if (from_type->onlyNull())
{
cache.is_empty = true;
@ -711,8 +713,6 @@ namespace
throw Exception(
ErrorCodes::ILLEGAL_COLUMN, "Second and third arguments of function {} must be constant arrays.", getName());
std::lock_guard lock(cache.mutex);
const ColumnPtr & from_column_uncasted = array_from->getDataPtr();
cache.from_column = castColumn(

View File

@ -49,7 +49,7 @@ ConcurrentHashJoin::ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<Tabl
}
}
bool ConcurrentHashJoin::addJoinedBlock(const Block & right_block, bool check_limits)
bool ConcurrentHashJoin::addBlockToJoin(const Block & right_block, bool check_limits)
{
Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_right, right_block);
@ -77,7 +77,7 @@ bool ConcurrentHashJoin::addJoinedBlock(const Block & right_block, bool check_li
if (!lock.owns_lock())
continue;
bool limit_exceeded = !hash_join->data->addJoinedBlock(dispatched_block, check_limits);
bool limit_exceeded = !hash_join->data->addBlockToJoin(dispatched_block, check_limits);
dispatched_block = {};
blocks_left--;

View File

@ -16,13 +16,13 @@ namespace DB
{
/**
* Can run addJoinedBlock() parallelly to speedup the join process. On test, it almose linear speedup by
* Can run addBlockToJoin() parallelly to speedup the join process. On test, it almose linear speedup by
* the degree of parallelism.
*
* The default HashJoin is not thread safe for inserting right table's rows and run it in a single thread. When
* the right table is large, the join process is too slow.
*
* We create multiple HashJoin instances here. In addJoinedBlock(), one input block is split into multiple blocks
* We create multiple HashJoin instances here. In addBlockToJoin(), one input block is split into multiple blocks
* corresponding to the HashJoin instances by hashing every row on the join keys. And make a guarantee that every HashJoin
* instance is written by only one thread.
*
@ -37,7 +37,7 @@ public:
~ConcurrentHashJoin() override = default;
const TableJoin & getTableJoin() const override { return *table_join; }
bool addJoinedBlock(const Block & block, bool check_limits) override;
bool addBlockToJoin(const Block & block, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
void setTotals(const Block & block) override;

View File

@ -875,9 +875,9 @@ catch (...)
"It is ok to skip this exception as cleaning old temporary files is not necessary", path));
}
static VolumePtr createLocalSingleDiskVolume(const std::string & path)
static VolumePtr createLocalSingleDiskVolume(const std::string & path, const Poco::Util::AbstractConfiguration & config_)
{
auto disk = std::make_shared<DiskLocal>("_tmp_default", path, 0);
auto disk = std::make_shared<DiskLocal>("_tmp_default", path, 0, config_, "storage_configuration.disks._tmp_default");
VolumePtr volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk, 0);
return volume;
}
@ -893,7 +893,7 @@ void Context::setTemporaryStoragePath(const String & path, size_t max_size)
if (!shared->tmp_path.ends_with('/'))
shared->tmp_path += '/';
VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path);
VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path, getConfigRef());
for (const auto & disk : volume->getDisks())
{
@ -966,7 +966,7 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t
LOG_DEBUG(shared->log, "Using file cache ({}) for temporary files", file_cache->getBasePath());
shared->tmp_path = file_cache->getBasePath();
VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path);
VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path, getConfigRef());
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, file_cache.get(), max_size);
}

View File

@ -103,7 +103,7 @@ DirectKeyValueJoin::DirectKeyValueJoin(
right_sample_block_with_storage_column_names = right_sample_block_with_storage_column_names_;
}
bool DirectKeyValueJoin::addJoinedBlock(const Block &, bool)
bool DirectKeyValueJoin::addBlockToJoin(const Block &, bool)
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
}

View File

@ -32,10 +32,10 @@ public:
virtual const TableJoin & getTableJoin() const override { return *table_join; }
virtual bool addJoinedBlock(const Block &, bool) override;
virtual bool addBlockToJoin(const Block &, bool) override;
virtual void checkTypesOfKeys(const Block &) const override;
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addJoinedBlock).
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addBlockToJoin).
/// Could be called from different threads in parallel.
virtual void joinBlock(Block & block, std::shared_ptr<ExtraBlock> &) override;

View File

@ -11,16 +11,7 @@
namespace DB
{
///
/// -------- Column --------- Type ------
/// | event_date | DateTime |
/// | event_time | UInt64 |
/// | query_id | String |
/// | remote_file_path | String |
/// | segment_range | Tuple |
/// | read_type | String |
/// -------------------------------------
///
struct FilesystemCacheLogElement
{
enum class CacheType

View File

@ -30,9 +30,9 @@ public:
const TableJoin & getTableJoin() const override { return *table_join; }
bool addJoinedBlock(const Block & /* block */, bool /* check_limits */) override
bool addBlockToJoin(const Block & /* block */, bool /* check_limits */) override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::addJoinedBlock should not be called");
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::addBlockToJoin should not be called");
}
static bool isSupported(const std::shared_ptr<TableJoin> & table_join)

View File

@ -288,10 +288,7 @@ void GraceHashJoin::initBuckets()
size_t initial_num_buckets = roundUpToPowerOfTwoOrZero(std::clamp<size_t>(settings.grace_hash_join_initial_buckets, 1, settings.grace_hash_join_max_buckets));
for (size_t i = 0; i < initial_num_buckets; ++i)
{
addBucket(buckets);
}
addBuckets(initial_num_buckets);
if (buckets.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created");
@ -310,13 +307,13 @@ bool GraceHashJoin::isSupported(const std::shared_ptr<TableJoin> & table_join)
GraceHashJoin::~GraceHashJoin() = default;
bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/)
bool GraceHashJoin::addBlockToJoin(const Block & block, bool /*check_limits*/)
{
if (current_bucket == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "GraceHashJoin is not initialized");
Block materialized = materializeBlock(block);
addJoinedBlockImpl(std::move(materialized));
addBlockToJoinImpl(std::move(materialized));
return true;
}
@ -356,52 +353,66 @@ bool GraceHashJoin::hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const
return hasMemoryOverflow(total_rows, total_bytes);
}
GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
GraceHashJoin::Buckets GraceHashJoin::rehashBuckets()
{
std::unique_lock lock(rehash_mutex);
if (!isPowerOf2(buckets.size())) [[unlikely]]
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of buckets should be power of 2 but it's {}", buckets.size());
const size_t to_size = buckets.size() * 2;
size_t current_size = buckets.size();
if (to_size <= current_size)
return buckets;
chassert(isPowerOf2(to_size));
if (to_size > max_num_buckets)
{
throw Exception(ErrorCodes::LIMIT_EXCEEDED,
throw Exception(
ErrorCodes::LIMIT_EXCEEDED,
"Too many grace hash join buckets ({} > {}), "
"consider increasing grace_hash_join_max_buckets or max_rows_in_join/max_bytes_in_join",
to_size, max_num_buckets);
to_size,
max_num_buckets);
}
LOG_TRACE(log, "Rehashing from {} to {}", current_size, to_size);
buckets.reserve(to_size);
for (size_t i = current_size; i < to_size; ++i)
addBucket(buckets);
addBuckets(to_size - current_size);
return buckets;
}
void GraceHashJoin::addBucket(Buckets & destination)
void GraceHashJoin::addBuckets(const size_t bucket_count)
{
// There could be exceptions from createStream, In ci tests
// there is a certain probability of failure in allocating memory, see memory_tracker_fault_probability.
// It may terminate this thread and leave a broken hash_join, and another thread cores when it tries to
// use the broken hash_join. So we print an exception message here to help debug.
try
{
auto & left_file = tmp_data->createStream(left_sample_block);
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
// Exception can be thrown in number of cases:
// - during creation of temporary files for buckets
// - in CI tests, there is a certain probability of failure in allocating memory, see memory_tracker_fault_probability
// Therefore, new buckets are added only after all of them created successfully,
// otherwise we can end up having unexpected number of buckets
BucketPtr new_bucket = std::make_shared<FileBucket>(destination.size(), left_file, right_file, log);
destination.emplace_back(std::move(new_bucket));
}
catch (...)
{
LOG_ERROR(&Poco::Logger::get("GraceHashJoin"), "Can't create bucket. current buckets size: {}", destination.size());
throw;
}
const size_t current_size = buckets.size();
Buckets tmp_buckets;
tmp_buckets.reserve(bucket_count);
for (size_t i = 0; i < bucket_count; ++i)
try
{
auto & left_file = tmp_data->createStream(left_sample_block);
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
BucketPtr new_bucket = std::make_shared<FileBucket>(current_size + i, left_file, right_file, log);
tmp_buckets.emplace_back(std::move(new_bucket));
}
catch (...)
{
LOG_ERROR(
&Poco::Logger::get("GraceHashJoin"),
"Can't create bucket {} due to error: {}",
current_size + i,
getCurrentExceptionMessage(false));
throw;
}
buckets.reserve(buckets.size() + bucket_count);
for (auto & bucket : tmp_buckets)
buckets.emplace_back(std::move(bucket));
}
void GraceHashJoin::checkTypesOfKeys(const Block & block) const
@ -596,7 +607,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
while (Block block = right_reader.read())
{
num_rows += block.rows();
addJoinedBlockImpl(std::move(block));
addBlockToJoinImpl(std::move(block));
}
LOG_TRACE(log, "Loaded bucket {} with {}(/{}) rows",
@ -621,7 +632,7 @@ Block GraceHashJoin::prepareRightBlock(const Block & block)
return HashJoin::prepareRightBlock(block, hash_join_sample_block);
}
void GraceHashJoin::addJoinedBlockImpl(Block block)
void GraceHashJoin::addBlockToJoinImpl(Block block)
{
block = prepareRightBlock(block);
Buckets buckets_snapshot = getCurrentBuckets();
@ -638,15 +649,10 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
if (current_block.rows() > 0)
{
std::lock_guard lock(hash_join_mutex);
auto current_buckets = getCurrentBuckets();
if (!isPowerOf2(current_buckets.size())) [[unlikely]]
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Broken buckets. its size({}) is not power of 2", current_buckets.size());
}
if (!hash_join)
hash_join = makeInMemoryJoin();
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
hash_join->addBlockToJoin(current_block, /* check_limits = */ false);
if (!hasMemoryOverflow(hash_join))
return;
@ -654,7 +660,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = {};
// Must use the latest buckets snapshot in case that it has been rehashed by other threads.
buckets_snapshot = rehashBuckets(current_buckets.size() * 2);
buckets_snapshot = rehashBuckets();
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
hash_join = nullptr;
@ -677,7 +683,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
hash_join = makeInMemoryJoin();
if (current_block.rows() > 0)
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
hash_join->addBlockToJoin(current_block, /* check_limits = */ false);
}
}

View File

@ -23,11 +23,11 @@ class HashJoin;
*
* The joining algorithm consists of three stages:
*
* 1) During the first stage we accumulate blocks of the right table via @addJoinedBlock.
* 1) During the first stage we accumulate blocks of the right table via @addBlockToJoin.
* Each input block is split into multiple buckets based on the hash of the row join keys.
* The first bucket is added to the in-memory HashJoin, and the remaining buckets are written to disk for further processing.
* When the size of HashJoin exceeds the limits, we double the number of buckets.
* There can be multiple threads calling addJoinedBlock, just like @ConcurrentHashJoin.
* There can be multiple threads calling addBlockToJoin, just like @ConcurrentHashJoin.
*
* 2) At the second stage we process left table blocks via @joinBlock.
* Again, each input block is split into multiple buckets by hash.
@ -65,7 +65,7 @@ public:
void initialize(const Block & sample_block) override;
bool addJoinedBlock(const Block & block, bool check_limits) override;
bool addBlockToJoin(const Block & block, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
@ -94,22 +94,23 @@ private:
InMemoryJoinPtr makeInMemoryJoin();
/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(Block block);
void addBlockToJoinImpl(Block block);
/// Check that join satisfies limits on rows/bytes in table_join.
bool hasMemoryOverflow(size_t total_rows, size_t total_bytes) const;
bool hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const;
bool hasMemoryOverflow(const BlocksList & blocks) const;
/// Create new bucket at the end of @destination.
void addBucket(Buckets & destination);
/// Add bucket_count new buckets
/// Throws if a bucket creation fails
void addBuckets(size_t bucket_count);
/// Increase number of buckets to match desired_size.
/// Called when HashJoin in-memory table for one bucket exceeds the limits.
///
/// NB: after @rehashBuckets there may be rows that are written to the buckets that they do not belong to.
/// It is fine; these rows will be written to the corresponding buckets during the third stage.
Buckets rehashBuckets(size_t to_size);
Buckets rehashBuckets();
/// Perform some bookkeeping after all calls to @joinBlock.
void startReadingDelayedBlocks();

View File

@ -79,8 +79,8 @@ namespace JoinStuff
{
assert(flags[nullptr].size() <= size);
need_flags = true;
// For one disjunct clause case, we don't need to reinit each time we call addJoinedBlock.
// and there is no value inserted in this JoinUsedFlags before addJoinedBlock finish.
// For one disjunct clause case, we don't need to reinit each time we call addBlockToJoin.
// and there is no value inserted in this JoinUsedFlags before addBlockToJoin finish.
// So we reinit only when the hash table is rehashed to a larger size.
if (flags.empty() || flags[nullptr].size() < size) [[unlikely]]
{
@ -581,7 +581,7 @@ namespace
};
template <JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
template <JoinStrictness STRICTNESS, typename KeyGetter, typename Map>
size_t NO_INLINE insertFromBlockImplTypeCase(
HashJoin & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted)
@ -600,7 +600,7 @@ namespace
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
if (null_map && (*null_map)[i])
{
/// nulls are not inserted into hash table,
/// keep them for RIGHT and FULL joins
@ -622,21 +622,6 @@ namespace
return map.getBufferSizeInCells();
}
template <JoinStrictness STRICTNESS, typename KeyGetter, typename Map>
size_t insertFromBlockImplType(
HashJoin & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted)
{
if (null_map)
return insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(
join, map, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted);
else
return insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(
join, map, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted);
}
template <JoinStrictness STRICTNESS, typename Maps>
size_t insertFromBlockImpl(
HashJoin & join, HashJoin::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
@ -653,7 +638,7 @@ namespace
#define M(TYPE) \
case HashJoin::Type::TYPE: \
return insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<HashJoin::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\
return insertFromBlockImplTypeCase<STRICTNESS, typename KeyGetterForType<HashJoin::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\
join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted); \
break;
@ -729,7 +714,7 @@ Block HashJoin::prepareRightBlock(const Block & block) const
return prepareRightBlock(block, savedBlockSample());
}
bool HashJoin::addJoinedBlock(const Block & source_block_, bool check_limits)
bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
{
if (!data)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Join data was released");
@ -781,7 +766,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block_, bool check_limits)
size_t total_bytes = 0;
{
if (storage_join_lock)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "addJoinedBlock called when HashJoin locked to prevent updates");
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "addBlockToJoin called when HashJoin locked to prevent updates");
data->blocks_allocated_size += block_to_save.allocatedBytes();
data->blocks.emplace_back(std::move(block_to_save));
@ -1260,7 +1245,7 @@ void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unuse
/// Joins right table columns which indexes are present in right_indexes using specified map.
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool has_null_map, bool multiple_disjuncts>
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool multiple_disjuncts>
NO_INLINE IColumn::Filter joinRightColumns(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
@ -1284,20 +1269,13 @@ NO_INLINE IColumn::Filter joinRightColumns(
for (size_t i = 0; i < rows; ++i)
{
bool right_row_found = false;
bool null_element_found = false;
KnownRowsHolder<multiple_disjuncts> known_rows;
for (size_t onexpr_idx = 0; onexpr_idx < added_columns.join_on_keys.size(); ++onexpr_idx)
{
const auto & join_keys = added_columns.join_on_keys[onexpr_idx];
if constexpr (has_null_map)
{
if (join_keys.null_map && (*join_keys.null_map)[i])
{
null_element_found = true;
continue;
}
}
if (join_keys.null_map && (*join_keys.null_map)[i])
continue;
bool row_acceptable = !join_keys.isRowFiltered(i);
using FindResult = typename KeyGetter::FindResult;
@ -1379,20 +1357,6 @@ NO_INLINE IColumn::Filter joinRightColumns(
}
}
if constexpr (has_null_map)
{
if (!right_row_found && null_element_found)
{
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
if constexpr (join_features.need_replication)
{
(*added_columns.offsets_to_replicate)[i] = current_offset;
}
continue;
}
}
if (!right_row_found)
{
if constexpr (join_features.is_anti_join && join_features.left)
@ -1410,7 +1374,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
return filter;
}
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool has_null_map>
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter>
IColumn::Filter joinRightColumnsSwitchMultipleDisjuncts(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
@ -1418,8 +1382,8 @@ IColumn::Filter joinRightColumnsSwitchMultipleDisjuncts(
JoinStuff::JoinUsedFlags & used_flags [[maybe_unused]])
{
return mapv.size() > 1
? joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, need_filter, has_null_map, true>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags)
: joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, need_filter, has_null_map, false>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
? joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, need_filter, true>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags)
: joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, need_filter, false>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
}
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map>
@ -1429,21 +1393,13 @@ IColumn::Filter joinRightColumnsSwitchNullability(
AddedColumns & added_columns,
JoinStuff::JoinUsedFlags & used_flags)
{
bool has_null_map = std::any_of(added_columns.join_on_keys.begin(), added_columns.join_on_keys.end(),
[](const auto & k) { return k.null_map; });
if (added_columns.need_filter)
{
if (has_null_map)
return joinRightColumnsSwitchMultipleDisjuncts<KIND, STRICTNESS, KeyGetter, Map, true, true>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
else
return joinRightColumnsSwitchMultipleDisjuncts<KIND, STRICTNESS, KeyGetter, Map, true, false>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
return joinRightColumnsSwitchMultipleDisjuncts<KIND, STRICTNESS, KeyGetter, Map, true>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
}
else
{
if (has_null_map)
return joinRightColumnsSwitchMultipleDisjuncts<KIND, STRICTNESS, KeyGetter, Map, false, true>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
else
return joinRightColumnsSwitchMultipleDisjuncts<KIND, STRICTNESS, KeyGetter, Map, false, false>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
return joinRightColumnsSwitchMultipleDisjuncts<KIND, STRICTNESS, KeyGetter, Map, true>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
}
}
@ -1868,7 +1824,7 @@ struct AdderNonJoined
/// Based on:
/// - map offsetInternal saved in used_flags for single disjuncts
/// - flags in BlockWithFlags for multiple disjuncts
template<bool multiple_disjuncts>
template <bool multiple_disjuncts>
class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller
{
public:

View File

@ -155,11 +155,11 @@ public:
/** Add block of data from right hand of JOIN to the map.
* Returns false, if some limit was exceeded and you should not insert more data.
*/
bool addJoinedBlock(const Block & source_block_, bool check_limits) override;
bool addBlockToJoin(const Block & source_block_, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
/** Join data from the map (that was previously built by calls to addJoinedBlock) to the block with data from "left" table.
/** Join data from the map (that was previously built by calls to addBlockToJoin) to the block with data from "left" table.
* Could be called from different threads in parallel.
*/
void joinBlock(Block & block, ExtraBlockPtr & not_processed) override;
@ -406,7 +406,7 @@ private:
Poco::Logger * log;
/// Should be set via setLock to protect hash table from modification from StorageJoin
/// If set HashJoin instance is not available for modification (addJoinedBlock)
/// If set HashJoin instance is not available for modification (addBlockToJoin)
TableLockHolder storage_join_lock = nullptr;
void dataMapInit(MapsVariant &);

View File

@ -52,7 +52,7 @@ public:
/// Add block of data from right hand of JOIN.
/// @returns false, if some limit was exceeded and you should not insert more data.
virtual bool addJoinedBlock(const Block & block, bool check_limits = true) = 0; /// NOLINT
virtual bool addBlockToJoin(const Block & block, bool check_limits = true) = 0; /// NOLINT
/* Some initialization may be required before joinBlock() call.
* It's better to done in in constructor, but left block exact structure is not known at that moment.
@ -62,7 +62,7 @@ public:
virtual void checkTypesOfKeys(const Block & block) const = 0;
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addJoinedBlock).
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addBlockToJoin).
/// Could be called from different threads in parallel.
virtual void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) = 0;
@ -79,7 +79,7 @@ public:
/// Returns true if no data to join with.
virtual bool alwaysReturnsEmptySet() const = 0;
/// StorageJoin/Dictionary is already filled. No need to call addJoinedBlock.
/// StorageJoin/Dictionary is already filled. No need to call addBlockToJoin.
/// Different query plan is used for such joins.
virtual bool isFilled() const { return pipelineType() == JoinPipelineType::FilledRight; }
virtual JoinPipelineType pipelineType() const { return JoinPipelineType::FillRightFirst; }

View File

@ -881,46 +881,24 @@ void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & creat
}
}
String InterpreterCreateQuery::getTableEngineName(DefaultTableEngine default_table_engine)
namespace
{
switch (default_table_engine)
void checkTemporaryTableEngineName(const String& name)
{
case DefaultTableEngine::Log:
return "Log";
case DefaultTableEngine::StripeLog:
return "StripeLog";
case DefaultTableEngine::MergeTree:
return "MergeTree";
case DefaultTableEngine::ReplacingMergeTree:
return "ReplacingMergeTree";
case DefaultTableEngine::ReplicatedMergeTree:
return "ReplicatedMergeTree";
case DefaultTableEngine::ReplicatedReplacingMergeTree:
return "ReplicatedReplacingMergeTree";
case DefaultTableEngine::Memory:
return "Memory";
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "default_table_engine is set to unknown value");
if (name.starts_with("Replicated") || name == "KeeperMap")
throw Exception(ErrorCodes::INCORRECT_QUERY, "Temporary tables cannot be created with Replicated or KeeperMap table engines");
}
}
void InterpreterCreateQuery::setDefaultTableEngine(ASTStorage & storage, ContextPtr local_context)
{
if (local_context->getSettingsRef().default_table_engine.value == DefaultTableEngine::None)
throw Exception(ErrorCodes::ENGINE_REQUIRED, "Table engine is not specified in CREATE query");
void setDefaultTableEngine(ASTStorage &storage, DefaultTableEngine engine)
{
if (engine == DefaultTableEngine::None)
throw Exception(ErrorCodes::ENGINE_REQUIRED, "Table engine is not specified in CREATE query");
auto engine_ast = std::make_shared<ASTFunction>();
auto default_table_engine = local_context->getSettingsRef().default_table_engine.value;
engine_ast->name = getTableEngineName(default_table_engine);
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = SettingFieldDefaultTableEngine(engine).toString();
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
@ -936,32 +914,23 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
if (create.temporary)
{
/// It's possible if some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not.
/// It makes sense when default_table_engine setting is used, but not for temporary tables.
/// For temporary tables we ignore this setting to allow CREATE TEMPORARY TABLE query without specifying ENGINE
/// Some part of storage definition is specified, but ENGINE is not: just set the one from default_temporary_table_engine setting.
if (!create.cluster.empty())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Temporary tables cannot be created with ON CLUSTER clause");
if (create.storage)
if (!create.storage)
{
if (create.storage->engine)
{
if (create.storage->engine->name.starts_with("Replicated") || create.storage->engine->name == "KeeperMap")
throw Exception(ErrorCodes::INCORRECT_QUERY, "Temporary tables cannot be created with Replicated or KeeperMap table engines");
}
else
throw Exception(ErrorCodes::INCORRECT_QUERY, "Invalid storage definition for temporary table");
}
else
{
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Memory";
engine_ast->no_empty_args = true;
auto storage_ast = std::make_shared<ASTStorage>();
storage_ast->set(storage_ast->engine, engine_ast);
create.set(create.storage, storage_ast);
}
if (!create.storage->engine)
{
setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_temporary_table_engine.value);
}
checkTemporaryTableEngineName(create.storage->engine->name);
return;
}
@ -969,7 +938,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
/// Some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not: just set default one.
if (!create.storage->engine)
setDefaultTableEngine(*create.storage, getContext());
setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
return;
}
@ -1008,7 +977,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
}
create.set(create.storage, std::make_shared<ASTStorage>());
setDefaultTableEngine(*create.storage, getContext());
setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
}
static void generateUUIDForTable(ASTCreateQuery & create)

View File

@ -90,8 +90,6 @@ private:
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
TableProperties getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create) const;
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
static String getTableEngineName(DefaultTableEngine default_table_engine);
static void setDefaultTableEngine(ASTStorage & storage, ContextPtr local_context);
void setEngine(ASTCreateQuery & create) const;
AccessRightsElements getRequiredAccess() const;

View File

@ -370,15 +370,15 @@ BlockIO InterpreterSystemQuery::execute()
else
{
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
if (query.delete_key.empty())
if (query.key_to_drop.empty())
{
cache->removeAllReleasable();
}
else
{
auto key = FileCacheKey::fromKeyString(query.delete_key);
if (query.delete_offset.has_value())
cache->removeFileSegment(key, query.delete_offset.value());
auto key = FileCacheKey::fromKeyString(query.key_to_drop);
if (query.offset_to_drop.has_value())
cache->removeFileSegment(key, query.offset_to_drop.value());
else
cache->removeKey(key);
}

View File

@ -19,16 +19,16 @@ JoinSwitcher::JoinSwitcher(std::shared_ptr<TableJoin> table_join_, const Block &
limits.max_bytes = table_join->defaultMaxBytes();
}
bool JoinSwitcher::addJoinedBlock(const Block & block, bool)
bool JoinSwitcher::addBlockToJoin(const Block & block, bool)
{
std::lock_guard lock(switch_mutex);
if (switched)
return join->addJoinedBlock(block);
return join->addBlockToJoin(block);
/// HashJoin with external limits check
join->addJoinedBlock(block, false);
join->addBlockToJoin(block, false);
size_t rows = join->getTotalRowCount();
size_t bytes = join->getTotalByteCount();
@ -48,7 +48,7 @@ bool JoinSwitcher::switchJoin()
bool success = true;
for (const Block & saved_block : right_blocks)
success = success && join->addJoinedBlock(saved_block);
success = success && join->addBlockToJoin(saved_block);
switched = true;
return success;

View File

@ -23,7 +23,7 @@ public:
/// Add block of data from right hand of JOIN into current join object.
/// If join-in-memory memory limit exceeded switches to join-on-disk and continue with it.
/// @returns false, if join-on-disk disk limit exceeded
bool addJoinedBlock(const Block & block, bool check_limits) override;
bool addBlockToJoin(const Block & block, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override
{

View File

@ -669,7 +669,7 @@ Block MergeJoin::modifyRightBlock(const Block & src_block) const
return block;
}
bool MergeJoin::addJoinedBlock(const Block & src_block, bool)
bool MergeJoin::addBlockToJoin(const Block & src_block, bool)
{
Block block = modifyRightBlock(src_block);

View File

@ -23,7 +23,7 @@ public:
MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block);
const TableJoin & getTableJoin() const override { return *table_join; }
bool addJoinedBlock(const Block & block, bool check_limits) override;
bool addBlockToJoin(const Block & block, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
void joinBlock(Block &, ExtraBlockPtr & not_processed) override;

View File

@ -212,11 +212,11 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
if (!filesystem_cache_name.empty())
{
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_name;
if (!delete_key.empty())
if (!key_to_drop.empty())
{
settings.ostr << (settings.hilite ? hilite_none : "") << " KEY " << delete_key;
if (delete_offset.has_value())
settings.ostr << (settings.hilite ? hilite_none : "") << " OFFSET " << delete_offset.value();
settings.ostr << (settings.hilite ? hilite_none : "") << " KEY " << key_to_drop;
if (offset_to_drop.has_value())
settings.ostr << (settings.hilite ? hilite_none : "") << " OFFSET " << offset_to_drop.value();
}
}
}

View File

@ -107,8 +107,8 @@ public:
UInt64 seconds{};
String filesystem_cache_name;
std::string delete_key;
std::optional<size_t> delete_offset;
std::string key_to_drop;
std::optional<size_t> offset_to_drop;
String backup_name;

View File

@ -409,9 +409,9 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
res->filesystem_cache_name = ast->as<ASTLiteral>()->value.safeGet<String>();
if (ParserKeyword{"KEY"}.ignore(pos, expected) && ParserIdentifier().parse(pos, ast, expected))
{
res->delete_key = ast->as<ASTIdentifier>()->name();
res->key_to_drop = ast->as<ASTIdentifier>()->name();
if (ParserKeyword{"OFFSET"}.ignore(pos, expected) && ParserLiteral().parse(pos, ast, expected))
res->delete_offset = ast->as<ASTLiteral>()->value.safeGet<UInt64>();
res->offset_to_drop = ast->as<ASTLiteral>()->value.safeGet<UInt64>();
}
}
if (!parseQueryWithOnCluster(res, pos, expected))

View File

@ -319,6 +319,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
{
auto column_with_default = col.column->cloneEmpty();
col.type->insertDefaultInto(*column_with_default);
column_with_default->finalize();
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag->materializeNode(*node);

View File

@ -305,7 +305,7 @@ void FillingRightJoinSideTransform::work()
if (for_totals)
join->setTotals(block);
else
stop_reading = !join->addJoinedBlock(block);
stop_reading = !join->addBlockToJoin(block);
set_totals = for_totals;
}

View File

@ -455,22 +455,34 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart(
const std::string & to,
const std::string & dir_path,
const DiskPtr & disk,
const DiskPtr & dst_disk,
Poco::Logger * log) const
{
String path_to_clone = fs::path(to) / dir_path / "";
auto src_disk = volume->getDisk();
if (disk->exists(path_to_clone))
if (dst_disk->exists(path_to_clone))
{
LOG_WARNING(log, "Path {} already exists. Will remove it and clone again.", fullPath(disk, path_to_clone));
disk->removeRecursive(path_to_clone);
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS,
"Cannot clone part {} from '{}' to '{}': path '{}' already exists",
dir_path, getRelativePath(), path_to_clone, fullPath(dst_disk, path_to_clone));
}
disk->createDirectories(to);
volume->getDisk()->copy(getRelativePath(), disk, to);
volume->getDisk()->removeFileIfExists(fs::path(path_to_clone) / "delete-on-destroy.txt");
try
{
dst_disk->createDirectories(to);
src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone);
}
catch (...)
{
/// It's safe to remove it recursively (even with zero-copy-replication)
/// because we've just did full copy through copyDirectoryContent
LOG_WARNING(log, "Removing directory {} after failed attempt to move a data part", path_to_clone);
dst_disk->removeRecursive(path_to_clone);
throw;
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto single_disk_volume = std::make_shared<SingleDiskVolume>(dst_disk->getName(), dst_disk, 0);
return create(single_disk_volume, to, dir_path, /*initialize=*/ true);
}

View File

@ -68,7 +68,7 @@ public:
MutableDataPartStoragePtr clonePart(
const std::string & to,
const std::string & dir_path,
const DiskPtr & disk,
const DiskPtr & dst_disk,
Poco::Logger * log) const override;
void rename(

View File

@ -502,8 +502,10 @@ void IMergeTreeDataPart::removeIfNeeded()
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set",
getDataPartStorage().getPartDirectory(), name);
const auto part_parent_directory = directoryPath(part_directory);
bool is_moving_part = part_parent_directory.ends_with("moving/");
fs::path part_directory_path = getDataPartStorage().getRelativePath();
if (part_directory_path.filename().empty())
part_directory_path = part_directory_path.parent_path();
bool is_moving_part = part_directory_path.parent_path().filename() == "moving";
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part)
{
LOG_ERROR(

View File

@ -1030,7 +1030,7 @@ public:
/// Fetch part only if some replica has it on shared storage like S3
/// Overridden in StorageReplicatedMergeTree
virtual MutableDataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return nullptr; }
virtual MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return nullptr; }
/// Check shared data usage on other replicas for detached/freezed part
/// Remove local files and remote files if needed

View File

@ -233,9 +233,15 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
disk->createDirectories(path_to_clone);
cloned_part_storage = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name);
auto zero_copy_part = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name);
if (!cloned_part_storage)
if (zero_copy_part)
{
/// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder
zero_copy_part->is_temp = false; /// Do not remove it in dtor
cloned_part_storage = zero_copy_part->getDataPartStoragePtr();
}
else
{
LOG_INFO(log, "Part {} was not fetched, we are the first who move it to another disk, so we will copy it", part->name);
cloned_part_storage = part->getDataPartStorage().clonePart(path_to_clone, part->getDataPartStorage().getPartDirectory(), disk, log);

View File

@ -67,7 +67,9 @@ static void splitAndModifyMutationCommands(
if (!isWidePart(part) || !isFullPartStorage(part->getDataPartStorage()))
{
NameSet mutated_columns, dropped_columns;
NameSet mutated_columns;
NameSet dropped_columns;
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::MATERIALIZE_INDEX
@ -258,6 +260,10 @@ getColumnsForNewDataPart(
storage_columns.emplace_back(column);
}
NameSet storage_columns_set;
for (const auto & [name, _] : storage_columns)
storage_columns_set.insert(name);
for (const auto & command : all_commands)
{
if (command.type == MutationCommand::UPDATE)
@ -292,15 +298,19 @@ getColumnsForNewDataPart(
SerializationInfoByName new_serialization_infos;
for (const auto & [name, old_info] : serialization_infos)
{
if (removed_columns.contains(name))
continue;
auto it = renamed_columns_from_to.find(name);
auto new_name = it == renamed_columns_from_to.end() ? name : it->second;
/// Column can be removed only in this data part by CLEAR COLUMN query.
if (!storage_columns_set.contains(new_name) || removed_columns.contains(new_name))
continue;
/// In compact part we read all columns and all of them are in @updated_header.
/// But in wide part we must keep serialization infos for columns that are not touched by mutation.
if (!updated_header.has(new_name))
{
new_serialization_infos.emplace(new_name, old_info);
if (isWidePart(source_part))
new_serialization_infos.emplace(new_name, old_info);
continue;
}

View File

@ -1,4 +1,5 @@
#include "NamedCollectionsHelpers.h"
#include <Access/ContextAccess.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Storages/checkAndGetLiteralArgument.h>
@ -15,19 +16,16 @@ namespace ErrorCodes
namespace
{
NamedCollectionPtr tryGetNamedCollectionFromASTs(ASTs asts, bool throw_unknown_collection)
std::optional<std::string> getCollectionName(ASTs asts)
{
if (asts.empty())
return nullptr;
return std::nullopt;
const auto * identifier = asts[0]->as<ASTIdentifier>();
if (!identifier)
return nullptr;
return std::nullopt;
const auto & collection_name = identifier->name();
if (throw_unknown_collection)
return NamedCollectionFactory::instance().get(collection_name);
return NamedCollectionFactory::instance().tryGet(collection_name);
return identifier->name();
}
std::optional<std::pair<std::string, std::variant<Field, ASTPtr>>> getKeyValueFromAST(ASTPtr ast, bool fallback_to_ast_value, ContextPtr context)
@ -74,7 +72,18 @@ MutableNamedCollectionPtr tryGetNamedCollectionWithOverrides(
NamedCollectionUtils::loadIfNot();
auto collection = tryGetNamedCollectionFromASTs(asts, throw_unknown_collection);
auto collection_name = getCollectionName(asts);
if (!collection_name.has_value())
return nullptr;
context->checkAccess(AccessType::NAMED_COLLECTION, *collection_name);
NamedCollectionPtr collection;
if (throw_unknown_collection)
collection = NamedCollectionFactory::instance().get(*collection_name);
else
collection = NamedCollectionFactory::instance().tryGet(*collection_name);
if (!collection)
return nullptr;
@ -106,12 +115,14 @@ MutableNamedCollectionPtr tryGetNamedCollectionWithOverrides(
}
MutableNamedCollectionPtr tryGetNamedCollectionWithOverrides(
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
{
auto collection_name = config.getString(config_prefix + ".name", "");
if (collection_name.empty())
return nullptr;
context->checkAccess(AccessType::NAMED_COLLECTION, collection_name);
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
auto collection_copy = collection->duplicate();

View File

@ -22,7 +22,7 @@ MutableNamedCollectionPtr tryGetNamedCollectionWithOverrides(
ASTs asts, ContextPtr context, bool throw_unknown_collection = true, std::vector<std::pair<std::string, ASTPtr>> * complex_args = nullptr);
/// Helper function to get named collection for dictionary source.
/// Dictionaries have collection name as name argument of dict configuration and other arguments are overrides.
MutableNamedCollectionPtr tryGetNamedCollectionWithOverrides(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
MutableNamedCollectionPtr tryGetNamedCollectionWithOverrides(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context);
HTTPHeaderEntries getHeadersFromNamedCollection(const NamedCollection & collection);

View File

@ -146,7 +146,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
Block block;
while (executor.pull(block))
{
new_data->addJoinedBlock(block, true);
new_data->addBlockToJoin(block, true);
if (persistent)
backup_stream.write(block);
}
@ -257,7 +257,7 @@ void StorageJoin::insertBlock(const Block & block, ContextPtr context)
if (!holder)
throw Exception(ErrorCodes::DEADLOCK_AVOIDED, "StorageJoin: cannot insert data because current query tries to read from this storage");
join->addJoinedBlock(block_to_insert, true);
join->addBlockToJoin(block_to_insert, true);
}
size_t StorageJoin::getSize(ContextPtr context) const

View File

@ -1987,7 +1987,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
}
MutableDataPartStoragePtr StorageReplicatedMergeTree::executeFetchShared(
MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::executeFetchShared(
const String & source_replica,
const String & new_part_name,
const DiskPtr & disk,
@ -4476,7 +4476,7 @@ bool StorageReplicatedMergeTree::fetchPart(
}
MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & source_replica_path,
@ -4582,7 +4582,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
return part->getDataPartStoragePtr();
return part;
}
void StorageReplicatedMergeTree::startup()
@ -8901,7 +8901,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
}
MutableDataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::tryToFetchIfShared(
const IMergeTreeDataPart & part,
const DiskPtr & disk,
const String & path)

View File

@ -244,7 +244,7 @@ public:
bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;
/// Fetch part only when it stored on shared storage like S3
MutableDataPartStoragePtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
MutableDataPartPtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
/// Lock part in zookeeper for use shared data in several nodes
void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional<HardlinkedFiles> hardlinked_files) const override;
@ -286,7 +286,7 @@ public:
MergeTreeDataFormatVersion data_format_version);
/// Fetch part only if some replica has it on shared storage like S3
MutableDataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
/// Get best replica having this partition on a same type remote disk
String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const;
@ -717,7 +717,7 @@ private:
* Used for replace local part on the same s3-shared part in hybrid storage.
* Returns false if part is already fetching right now.
*/
MutableDataPartStoragePtr fetchExistsPart(
MutableDataPartPtr fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,

View File

@ -173,6 +173,16 @@ CI_CONFIG = {
"with_coverage": False,
"comment": "SSE2-only build",
},
"binary_riscv64": {
"compiler": "clang-16-riscv64",
"build_type": "",
"sanitizer": "",
"package_type": "binary",
"static_binary_name": "riscv64",
"tidy": "disable",
"with_coverage": False,
"comment": "",
},
},
"builds_report_config": {
"ClickHouse build check": [
@ -194,6 +204,7 @@ CI_CONFIG = {
"binary_freebsd",
"binary_darwin_aarch64",
"binary_ppc64le",
"binary_riscv64",
"binary_amd64_compat",
],
},

View File

@ -36,6 +36,7 @@ try:
from confluent_kafka.avro.cached_schema_registry_client import (
CachedSchemaRegistryClient,
)
from .hdfs_api import HDFSApi # imports requests_kerberos
except Exception as e:
logging.warning(f"Cannot import some modules, some tests may not work: {e}")
@ -51,7 +52,6 @@ from helpers.client import QueryRuntimeException
import docker
from .client import Client
from .hdfs_api import HDFSApi
from .config_cluster import *
@ -3416,13 +3416,14 @@ class ClickHouseInstance:
database=database,
)
time.sleep(sleep_time)
if result is not None:
return result
except QueryRuntimeException as ex:
logging.debug("Retry {} got exception {}".format(i + 1, ex))
time.sleep(sleep_time)
if result is not None:
return result
raise Exception("Query {sql} did not fail".format(sql))
raise Exception("Query {} did not fail".format(sql))
# The same as query_and_get_error but ignores successful query.
def query_and_get_answer_with_error(

View File

@ -32,6 +32,9 @@ class PartitionManager:
{"destination": instance.ip_address, "source_port": 2181, "action": action}
)
def dump_rules(self):
return _NetworkManager.get().dump_rules()
def restore_instance_zk_connections(self, instance, action="DROP"):
self._check_instance(instance)
@ -157,6 +160,10 @@ class _NetworkManager:
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def dump_rules(self):
cmd = ["iptables", "-L", "DOCKER-USER"]
return self._exec_run(cmd, privileged=True)
@staticmethod
def clean_all_user_iptables_rules():
for i in range(1000):
@ -212,8 +219,8 @@ class _NetworkManager:
def __init__(
self,
container_expire_timeout=50,
container_exit_timeout=60,
container_expire_timeout=120,
container_exit_timeout=120,
docker_api_version=os.environ.get("DOCKER_API_VERSION"),
):
self.container_expire_timeout = container_expire_timeout

View File

@ -66,5 +66,7 @@
"test_server_reload/test.py::test_remove_http_port",
"test_server_reload/test.py::test_remove_mysql_port",
"test_server_reload/test.py::test_remove_postgresql_port",
"test_server_reload/test.py::test_remove_tcp_port"
"test_server_reload/test.py::test_remove_tcp_port",
"test_keeper_map/test.py::test_keeper_map_without_zk"
]

View File

@ -12,6 +12,7 @@
</networks>
<profile>default</profile>
<quota>default</quota>
<named_collection_control>1</named_collection_control>
</default>
</users>

View File

@ -8,9 +8,14 @@ import logging
DICTS = ["configs/dictionaries/mysql_dict1.xml", "configs/dictionaries/mysql_dict2.xml"]
CONFIG_FILES = ["configs/remote_servers.xml", "configs/named_collections.xml"]
USER_CONFIGS = ["configs/users.xml"]
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance", main_configs=CONFIG_FILES, with_mysql=True, dictionaries=DICTS
"instance",
main_configs=CONFIG_FILES,
user_configs=USER_CONFIGS,
with_mysql=True,
dictionaries=DICTS,
)
create_table_mysql_template = """

View File

@ -0,0 +1,10 @@
<clickhouse>
<users>
<default>
<password></password>
<profile>default</profile>
<quota>default</quota>
<use_named_collections>1</use_named_collections>
</default>
</users>
</clickhouse>

View File

@ -1,7 +1,7 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.network import PartitionManager, _NetworkManager
test_recover_staled_replica_run = 1
@ -38,41 +38,67 @@ def remove_children(client, path):
client.delete(child_path)
def test_keeper_map_without_zk(started_cluster):
def assert_keeper_exception_after_partition(query):
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
error = node.query_and_get_error(query)
assert "Coordination::Exception" in error
def print_iptables_rules():
print(f"iptables rules: {_NetworkManager.get().dump_rules()}")
def assert_keeper_exception_after_partition(query):
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
try:
error = node.query_and_get_error_with_retry(query, sleep_time=1)
assert "Coordination::Exception" in error
except:
print_iptables_rules()
raise
def run_query(query):
try:
result = node.query_with_retry(query, sleep_time=1)
return result
except:
print_iptables_rules()
raise
def test_keeper_map_without_zk(started_cluster):
assert_keeper_exception_after_partition(
"CREATE TABLE test_keeper_map_without_zk (key UInt64, value UInt64) ENGINE = KeeperMap('/test_without_zk') PRIMARY KEY(key);"
"CREATE TABLE test_keeper_map_without_zk (key UInt64, value UInt64) ENGINE = KeeperMap('/test_keeper_map_without_zk') PRIMARY KEY(key);"
)
node.query(
"CREATE TABLE test_keeper_map_without_zk (key UInt64, value UInt64) ENGINE = KeeperMap('/test_without_zk') PRIMARY KEY(key);"
run_query(
"CREATE TABLE test_keeper_map_without_zk (key UInt64, value UInt64) ENGINE = KeeperMap('/test_keeper_map_without_zk') PRIMARY KEY(key);"
)
assert_keeper_exception_after_partition(
"INSERT INTO test_keeper_map_without_zk VALUES (1, 11)"
)
node.query("INSERT INTO test_keeper_map_without_zk VALUES (1, 11)")
run_query("INSERT INTO test_keeper_map_without_zk VALUES (1, 11)")
assert_keeper_exception_after_partition("SELECT * FROM test_keeper_map_without_zk")
node.query("SELECT * FROM test_keeper_map_without_zk")
assert run_query("SELECT * FROM test_keeper_map_without_zk") == "1\t11\n"
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
node.restart_clickhouse(60)
error = node.query_and_get_error("SELECT * FROM test_keeper_map_without_zk")
assert "Failed to activate table because of connection issues" in error
try:
error = node.query_and_get_error_with_retry(
"SELECT * FROM test_keeper_map_without_zk", sleep_time=1
)
assert "Failed to activate table because of connection issues" in error
except:
print_iptables_rules()
raise
node.query("SELECT * FROM test_keeper_map_without_zk")
run_query("SELECT * FROM test_keeper_map_without_zk")
client = get_genuine_zk()
remove_children(client, "/test_keeper_map/test_without_zk")
remove_children(client, "/test_keeper_map/test_keeper_map_without_zk")
node.restart_clickhouse(60)
error = node.query_and_get_error("SELECT * FROM test_keeper_map_without_zk")
error = node.query_and_get_error_with_retry(
"SELECT * FROM test_keeper_map_without_zk"
)
assert "Failed to activate table because of invalid metadata in ZooKeeper" in error
node.query("DETACH TABLE test_keeper_map_without_zk")

View File

@ -0,0 +1,9 @@
<clickhouse>
<users>
<default>
<password></password>
<profile>default</profile>
<named_collection_control>1</named_collection_control>
</default>
</users>
</clickhouse>

View File

@ -9,6 +9,7 @@ node = cluster.add_instance(
main_configs=[
"configs/named_collections.xml",
],
user_configs=["configs/users.xml"],
with_zookeeper=True,
)

View File

@ -72,4 +72,6 @@
</s3_no_retries>
</policies>
</storage_configuration>
<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>

View File

@ -183,7 +183,8 @@ def test_move_failover(cluster):
) ENGINE=MergeTree()
ORDER BY id
TTL dt + INTERVAL 4 SECOND TO VOLUME 'external'
SETTINGS storage_policy='s3_cold'
SETTINGS storage_policy='s3_cold', temporary_directories_lifetime=1,
merge_tree_clear_old_temporary_directories_interval_seconds=1
"""
)

View File

@ -49,6 +49,18 @@ def start_cluster():
cluster.shutdown()
def get_oldest_part(node, table_name):
return node.query(
f"SELECT name FROM system.parts WHERE table = '{table_name}' and active = 1 ORDER BY modification_time LIMIT 1"
).strip()
def get_disk_for_part(node, table_name, part):
return node.query(
f"SELECT disk_name FROM system.parts WHERE table == '{table_name}' and active = 1 and name = '{part}' ORDER BY modification_time"
).strip()
def test_system_tables(start_cluster):
expected_disks_data = [
{
@ -694,22 +706,21 @@ def test_jbod_overflow(start_cluster, name, engine):
def test_background_move(start_cluster, name, engine):
try:
node1.query_with_retry(
"""
f"""
CREATE TABLE IF NOT EXISTS {name} (
s1 String
) ENGINE = {engine}
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external'
""".format(
name=name, engine=engine
)
SETTINGS storage_policy='moving_jbod_with_external', max_replicated_merges_in_queue=0
"""
)
node1.query(f"SYSTEM STOP MERGES {name}")
first_part = None
for i in range(5):
data = [] # 5MB in total
for i in range(5):
for _ in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
# small jbod size is 40MB, so lets insert 5MB batch 5 times
node1.query_with_retry(
@ -718,25 +729,26 @@ def test_background_move(start_cluster, name, engine):
)
)
used_disks = get_used_disks_for_table(node1, name)
# we are doing moves in parallel so we need to fetch the name of first part before we add new parts
if i == 0:
first_part = get_oldest_part(node1, name)
assert first_part is not None
retry = 20
i = 0
while not sum(1 for x in used_disks if x == "jbod1") <= 2 and i < retry:
# multiple moves can be assigned in parallel so we can move later parts before the oldest
# we need to wait explicitly until the oldest part is moved
while get_disk_for_part(node1, name, first_part) != "external" and i < retry:
time.sleep(0.5)
used_disks = get_used_disks_for_table(node1, name)
i += 1
assert sum(1 for x in used_disks if x == "jbod1") <= 2
# first (oldest) part was moved to external
assert used_disks[0] == "external"
assert get_disk_for_part(node1, name, first_part) == "external"
node1.query("SYSTEM FLUSH LOGS")
path = node1.query(
"SELECT path_on_disk FROM system.part_log WHERE table = '{}' AND event_type='MovePart' AND part_name = 'all_1_1_0'".format(
name
)
f"SELECT path_on_disk FROM system.part_log WHERE table = '{name}' AND event_type='MovePart' AND part_name = '{first_part}'"
)
# first (oldest) part was moved to external
@ -762,36 +774,28 @@ def test_background_move(start_cluster, name, engine):
def test_start_stop_moves(start_cluster, name, engine):
try:
node1.query_with_retry(
"""
f"""
CREATE TABLE IF NOT EXISTS {name} (
s1 String
) ENGINE = {engine}
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external'
""".format(
name=name, engine=engine
)
SETTINGS storage_policy='moving_jbod_with_external', max_replicated_merges_in_queue=0
"""
)
node1.query_with_retry("INSERT INTO {} VALUES ('HELLO')".format(name))
node1.query_with_retry("INSERT INTO {} VALUES ('WORLD')".format(name))
node1.query_with_retry(f"INSERT INTO {name} VALUES ('HELLO')")
node1.query_with_retry(f"INSERT INTO {name} VALUES ('WORLD')")
used_disks = get_used_disks_for_table(node1, name)
assert all(d == "jbod1" for d in used_disks), "All writes shoud go to jbods"
first_part = node1.query(
"SELECT name FROM system.parts WHERE table = '{}' and active = 1 ORDER BY modification_time LIMIT 1".format(
name
)
).strip()
first_part = get_oldest_part(node1, name)
node1.query("SYSTEM STOP MOVES")
with pytest.raises(QueryRuntimeException):
node1.query(
"ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(
name, first_part
)
f"ALTER TABLE {name} MOVE PART '{first_part}' TO VOLUME 'external'"
)
used_disks = get_used_disks_for_table(node1, name)
@ -801,24 +805,18 @@ def test_start_stop_moves(start_cluster, name, engine):
node1.query("SYSTEM START MOVES")
node1.query(
"ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(
name, first_part
)
)
node1.query(f"ALTER TABLE {name} MOVE PART '{first_part}' TO VOLUME 'external'")
disk = node1.query(
"SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(
name, first_part
)
f"SELECT disk_name FROM system.parts WHERE table = '{name}' and name = '{first_part}' and active = 1"
).strip()
assert disk == "external"
node1.query_with_retry("TRUNCATE TABLE {}".format(name))
node1.query_with_retry(f"TRUNCATE TABLE {name}")
node1.query("SYSTEM STOP MOVES {}".format(name))
node1.query("SYSTEM STOP MERGES {}".format(name))
node1.query(f"SYSTEM STOP MOVES {name}")
node1.query(f"SYSTEM STOP MERGES {name}")
for i in range(5):
data = [] # 5MB in total
@ -831,6 +829,8 @@ def test_start_stop_moves(start_cluster, name, engine):
)
)
first_part = get_oldest_part(node1, name)
used_disks = get_used_disks_for_table(node1, name)
retry = 5
@ -843,23 +843,20 @@ def test_start_stop_moves(start_cluster, name, engine):
# first (oldest) part doesn't move anywhere
assert used_disks[0] == "jbod1"
node1.query("SYSTEM START MOVES {}".format(name))
node1.query(f"SYSTEM START MOVES {name}")
# wait sometime until background backoff finishes
# multiple moves can be assigned in parallel so we can move later parts before the oldest
# we need to wait explicitly until the oldest part is moved
retry = 60
i = 0
while not sum(1 for x in used_disks if x == "jbod1") <= 2 and i < retry:
while get_disk_for_part(node1, name, first_part) != "external" and i < retry:
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
i += 1
node1.query("SYSTEM START MERGES {}".format(name))
assert sum(1 for x in used_disks if x == "jbod1") <= 2
# first (oldest) part moved to external
assert used_disks[0] == "external"
assert get_disk_for_part(node1, name, first_part) == "external"
node1.query(f"SYSTEM START MERGES {name}")
finally:
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")

Some files were not shown because too many files have changed in this diff Show More