Merge remote-tracking branch 'origin/master' into pr-custom-key-failover

This commit is contained in:
Igor Nikonov 2023-12-15 21:14:43 +00:00
commit a735820400
116 changed files with 1471 additions and 630 deletions

3
.gitmodules vendored
View File

@ -354,6 +354,9 @@
[submodule "contrib/aklomp-base64"]
path = contrib/aklomp-base64
url = https://github.com/aklomp/base64.git
[submodule "contrib/pocketfft"]
path = contrib/pocketfft
url = https://github.com/mreineck/pocketfft.git
[submodule "contrib/sqids-cpp"]
path = contrib/sqids-cpp
url = https://github.com/sqids/sqids-cpp.git

View File

@ -105,7 +105,6 @@
* Rewrite equality with `is null` check in JOIN ON section. Experimental *Analyzer only*. [#56538](https://github.com/ClickHouse/ClickHouse/pull/56538) ([vdimir](https://github.com/vdimir)).
* Function`concat` now supports arbitrary argument types (instead of only String and FixedString arguments). This makes it behave more similar to MySQL `concat` implementation. For example, `SELECT concat('ab', 42)` now returns `ab42`. [#56540](https://github.com/ClickHouse/ClickHouse/pull/56540) ([Serge Klochkov](https://github.com/slvrtrn)).
* Allow getting cache configuration from 'named_collection' section in config or from SQL created named collections. [#56541](https://github.com/ClickHouse/ClickHouse/pull/56541) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Update `query_masking_rules` when reloading the config ([#56449](https://github.com/ClickHouse/ClickHouse/issues/56449)). [#56573](https://github.com/ClickHouse/ClickHouse/pull/56573) ([Mikhail Koviazin](https://github.com/mkmkme)).
* PostgreSQL database engine: Make the removal of outdated tables less aggressive with unsuccessful postgres connection. [#56609](https://github.com/ClickHouse/ClickHouse/pull/56609) ([jsc0218](https://github.com/jsc0218)).
* It took too much time to connnect to PG when URL is not right, so the relevant query stucks there and get cancelled. [#56648](https://github.com/ClickHouse/ClickHouse/pull/56648) ([jsc0218](https://github.com/jsc0218)).
* Keeper improvement: disable compressed logs by default in Keeper. [#56763](https://github.com/ClickHouse/ClickHouse/pull/56763) ([Antonio Andelic](https://github.com/antonio2368)).

View File

@ -44,6 +44,7 @@ else ()
endif ()
add_contrib (miniselect-cmake miniselect)
add_contrib (pdqsort-cmake pdqsort)
add_contrib (pocketfft-cmake pocketfft)
add_contrib (crc32-vpmsum-cmake crc32-vpmsum)
add_contrib (sparsehash-c11-cmake sparsehash-c11)
add_contrib (abseil-cpp-cmake abseil-cpp)

1
contrib/pocketfft vendored Submodule

@ -0,0 +1 @@
Subproject commit 9efd4da52cf8d28d14531d14e43ad9d913807546

View File

@ -0,0 +1,10 @@
option (ENABLE_POCKETFFT "Enable pocketfft" ${ENABLE_LIBRARIES})
if (NOT ENABLE_POCKETFFT)
message(STATUS "Not using pocketfft")
return()
endif()
add_library(_pocketfft INTERFACE)
target_include_directories(_pocketfft INTERFACE ${ClickHouse_SOURCE_DIR}/contrib/pocketfft)
add_library(ch_contrib::pocketfft ALIAS _pocketfft)

View File

@ -49,6 +49,7 @@ RUN curl https://sh.rustup.rs -sSf | bash -s -- -y && \
chmod 777 -R /rust && \
rustup toolchain install nightly-2023-07-04 && \
rustup default nightly-2023-07-04 && \
rustup toolchain remove stable && \
rustup component add rust-src && \
rustup target add x86_64-unknown-linux-gnu && \
rustup target add aarch64-unknown-linux-gnu && \

View File

@ -77,6 +77,7 @@ remove_keeper_config "create_if_not_exists" "[01]"
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml
@ -115,6 +116,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml

View File

@ -406,7 +406,7 @@ RESTORE TABLE data AS data_restored FROM Disk('s3_plain', 'cloud_backup');
:::note
But keep in mind that:
- This disk should not be used for `MergeTree` itself, only for `BACKUP`/`RESTORE`
- If your tables are backed by S3 storage, it doesn't use `CopyObject` calls to copy parts to the destination bucket, instead, it downloads and uploads them, which is very inefficient. Prefer to use `BACKUP ... TO S3(<endpoint>)` syntax for this use-case.
- If your tables are backed by S3 storage and types of the disks are different, it doesn't use `CopyObject` calls to copy parts to the destination bucket, instead, it downloads and uploads them, which is very inefficient. Prefer to use `BACKUP ... TO S3(<endpoint>)` syntax for this use-case.
:::
## Alternatives

View File

@ -216,6 +216,7 @@ Arguments:
- `--logger.level` — Log level.
- `--ignore-error` — do not stop processing if a query failed.
- `-c`, `--config-file` — path to configuration file in same format as for ClickHouse server, by default the configuration empty.
- `--no-system-tables` — do not attach system tables.
- `--help` — arguments references for `clickhouse-local`.
- `-V`, `--version` — print version information and exit.

View File

@ -0,0 +1,59 @@
---
slug: /en/sql-reference/functions/time-series-functions
sidebar_position: 172
sidebar_label: Time Series
---
# Time Series Functions
Below functions are used for time series analysis.
## seriesPeriodDetectFFT
Finds the period of the given time series data using FFT
FFT - [Fast Fourier transform](https://en.wikipedia.org/wiki/Fast_Fourier_transform)
**Syntax**
``` sql
seriesPeriodDetectFFT(series);
```
**Arguments**
- `series` - An array of numeric values
**Returned value**
- A real value equal to the period of time series
- Returns NAN when number of data points are less than four.
Type: [Float64](../../sql-reference/data-types/float.md).
**Examples**
Query:
``` sql
SELECT seriesPeriodDetectFFT([1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6]) AS print_0;
```
Result:
``` text
┌───────────print_0──────┐
│ 3 │
└────────────────────────┘
```
``` sql
SELECT seriesPeriodDetectFFT(arrayMap(x -> abs((x % 6) - 3), range(1000))) AS print_0;
```
Result:
``` text
┌─print_0─┐
│ 6 │
└─────────┘
```

View File

@ -128,17 +128,17 @@ Reading data from `table.csv`, located in `archive1.zip` or/and `archive2.zip`:
SELECT * FROM file('user_files/archives/archive{1..2}.zip :: table.csv');
```
## Globbing {#globs_in_path}
## Globs in path {#globs_in_path}
Paths may use globbing. Files must match the whole path pattern, not only the suffix or prefix.
- `*` — Represents arbitrarily many characters except `/` but including the empty string.
- `?` — Represents an arbitrary single character.
- `{some_string,another_string,yet_another_one}`Represents any of alternative strings `'some_string', 'another_string', 'yet_another_one'`. The strings may contain `/`.
- `{some_string,another_string,yet_another_one}`Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`. The strings can contain the `/` symbol.
- `{N..M}` — Represents any number `>= N` and `<= M`.
- `**` - Represents all files inside a folder recursively.
Constructions with `{}` are similar to the [remote](remote.md) table function.
Constructions with `{}` are similar to the [remote](remote.md) and [hdfs](hdfs.md) table functions.
**Example**

View File

@ -41,14 +41,14 @@ LIMIT 2
## Globs in path {#globs_in_path}
Multiple path components can have globs. For being processed file should exists and matches to the whole path pattern (not only suffix or prefix).
Paths may use globbing. Files must match the whole path pattern, not only the suffix or prefix.
- `*`Substitutes any number of any characters except `/` including empty string.
- `?`Substitutes any single character.
- `*`Represents arbitrarily many characters except `/` but including the empty string.
- `?`Represents an arbitrary single character.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`. The strings can contain the `/` symbol.
- `{N..M}`Substitutes any number in range from N to M including both borders.
- `{N..M}`Represents any number `>= N` and `<= M`.
Constructions with `{}` are similar to the [remote](../../sql-reference/table-functions/remote.md)) table function.
Constructions with `{}` are similar to the [remote](remote.md) and [file](file.md) table functions.
**Example**

View File

@ -45,6 +45,7 @@ $ clickhouse-local --structure "table_structure" --input-format "format_of_incom
- `--logger.level` — уровень логирования.
- `--ignore-error` — не прекращать обработку если запрос выдал ошибку.
- `-c`, `--config-file` — путь к файлу конфигурации. По умолчанию `clickhouse-local` запускается с пустой конфигурацией. Конфигурационный файл имеет тот же формат, что и для сервера ClickHouse, и в нём можно использовать все конфигурационные параметры сервера. Обычно подключение конфигурации не требуется; если требуется установить отдельный параметр, то это можно сделать ключом с именем параметра.
- `--no-system-tables` — запуск без использования системных таблиц.
- `--help` — вывод справочной информации о `clickhouse-local`.
- `-V`, `--version` — вывод текущей версии и выход.

View File

@ -76,14 +76,16 @@ SELECT * FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 U
## Шаблоны поиска в компонентах пути {#globs-in-path}
При описании пути к файлу могут использоваться шаблоны поиска. Обрабатываются только те файлы, у которых путь и название соответствуют шаблону полностью (а не только префикс или суффикс).
Путь к файлу может содержать шаблоны в режиме доступа только для чтения.
Шаблоны могут содержаться в разных частях пути.
Обрабатываться будут те и только те файлы, которые существуют в файловой системе и удовлетворяют всему шаблону пути.
- `*` — заменяет любое количество любых символов кроме `/`, включая отсутствие символов.
- `?` — заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`. Эти строки также могут содержать символ `/`.
- `{N..M}` — заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).
Конструкция с `{}` аналогична табличной функции [remote](remote.md).
Конструкция с `{}` аналогична табличным функциям [remote](remote.md), [hdfs](hdfs.md).
**Пример**

View File

@ -14,7 +14,7 @@ hdfs(URI, format, structure)
**Входные параметры**
- `URI` — URI файла в HDFS. Путь к файлу поддерживает следующие шаблоны в режиме доступа только для чтения `*`, `?`, `{abc,def}` и `{N..M}`, где `N`, `M` — числа, \``'abc', 'def'` — строки.
- `URI` — URI файла в HDFS.
- `format` — [формат](../../interfaces/formats.md#formats) файла.
- `structure` — структура таблицы. Формат `'column1_name column1_type, column2_name column2_type, ...'`.
@ -41,19 +41,22 @@ LIMIT 2
## Шаблоны поиска в компонентах пути {#globs-in-path}
- `*` — Заменяет любое количество любых символов кроме `/`, включая отсутствие символов.
Путь к файлу может содержать шаблоны в режиме доступа только для чтения.
Шаблоны могут содержаться в разных частях пути.
Обрабатываться будут те и только те файлы, которые существуют в файловой системе и удовлетворяют всему шаблону пути.
- `*` — Заменяет любое количество любых символов (кроме `/`), включая отсутствие символов.
- `?` — Заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — Заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`. Эти строки также могут содержать символ `/`.
- `{N..M}` — Заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).
Конструкция с `{}` аналогична табличной функции [remote](remote.md).
Конструкция с `{}` аналогична табличной функции [remote](remote.md), [file](file.md).
:::danger Предупреждение
Если ваш список файлов содержит интервал с ведущими нулями, используйте конструкцию с фигурными скобками для каждой цифры по отдельности или используйте `?`.
Если ваш список файлов содержит интервал с ведущими нулями, используйте отдельную конструкцию с фигурными скобками для каждой цифры или используйте `?`.
:::
Шаблоны могут содержаться в разных частях пути. Обрабатываться будут ровно те файлы, которые и удовлетворяют всему шаблону пути, и существуют в файловой системе.
## Виртуальные столбцы {#virtualnye-stolbtsy}
- `_path` — Путь к файлу.

View File

@ -45,6 +45,7 @@ clickhouse-local --structure "table_structure" --input-format "format_of_incomin
- `--logger.level` — 日志级别。
- `--ignore-error` — 当查询失败时,不停止处理。
- `-c`, `--config-file` — 与ClickHouse服务器格式相同配置文件的路径默认情况下配置为空。
- `--no-system-tables` — 不附加系统表。
- `--help``clickhouse-local`使用帮助信息。
- `-V`, `--version` — 打印版本信息并退出。

View File

@ -115,6 +115,7 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/LocalDirectorySyncGuard.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/TemporaryFileOnDisk.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/loadLocalDiskConfig.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/DiskType.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/IObjectStorage.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp

View File

@ -744,7 +744,7 @@ void LocalServer::processConfig()
LOG_DEBUG(log, "Loading metadata from {}", path);
auto startup_system_tasks = loadMetadataSystem(global_context);
attachSystemTablesLocal</* lazy= */ true>(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
waitLoad(TablesLoaderForegroundPoolId, startup_system_tasks);
@ -761,9 +761,9 @@ void LocalServer::processConfig()
LOG_DEBUG(log, "Loaded metadata.");
}
else
else if (!config().has("no-system-tables"))
{
attachSystemTablesLocal</* lazy= */ true>(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
}
@ -842,6 +842,7 @@ void LocalServer::addOptions(OptionsDescription & options_description)
("logger.log", po::value<std::string>(), "Log file name")
("logger.level", po::value<std::string>(), "Log level")
("no-system-tables", "do not attach system tables (better startup time)")
("path", po::value<std::string>(), "Storage path")
("only-system-tables", "attach only system tables from specified path")
("top_level_domains_path", po::value<std::string>(), "Path to lists with custom TLDs")
@ -870,6 +871,8 @@ void LocalServer::processOptions(const OptionsDescription &, const CommandLineOp
config().setString("table-file", options["file"].as<std::string>());
if (options.count("structure"))
config().setString("table-structure", options["structure"].as<std::string>());
if (options.count("no-system-tables"))
config().setBool("no-system-tables", true);
if (options.count("only-system-tables"))
config().setBool("only-system-tables", true);
if (options.count("database"))

View File

@ -45,6 +45,7 @@
#include <Common/makeSocketAddress.h>
#include <Common/FailPoint.h>
#include <Server/waitServersToFinish.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Core/ServerUUID.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
@ -1450,8 +1451,6 @@ try
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);
global_context->reloadQueryMaskingRulesIfChanged(config);
std::lock_guard lock(servers_lock);
updateServers(*config, server_pool, async_metrics, servers, servers_to_start_before_tables);
}
@ -1472,6 +1471,8 @@ try
#endif
NamedCollectionUtils::reloadFromConfig(*config);
FileCacheFactory::instance().updateSettingsFromConfig(*config);
ProfileEvents::increment(ProfileEvents::MainConfigLoads);
/// Must be the last.

View File

@ -1,24 +1,25 @@
extern crate blake3;
extern crate libc;
use std::ffi::{CStr, CString};
use std::ffi::{CString};
use std::slice;
use std::os::raw::c_char;
#[no_mangle]
pub unsafe extern "C" fn blake3_apply_shim(
begin: *const c_char,
_size: u32,
size: u32,
out_char_data: *mut u8,
) -> *mut c_char {
if begin.is_null() {
let err_str = CString::new("input was a null pointer").unwrap();
return err_str.into_raw();
}
let input_res = slice::from_raw_parts(begin as *const u8, size as usize);
let mut hasher = blake3::Hasher::new();
let input_bytes = CStr::from_ptr(begin);
let input_res = input_bytes.to_bytes();
hasher.update(input_res);
let mut reader = hasher.finalize_xof();
reader.fill(std::slice::from_raw_parts_mut(out_char_data, blake3::OUT_LEN));
std::ptr::null_mut()
}

View File

@ -436,6 +436,10 @@ dbms_target_link_libraries(PRIVATE ch_contrib::zstd)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::zstd)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::xz)
if (TARGET ch_contrib::pocketfft)
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::pocketfft)
endif ()
if (TARGET ch_contrib::icu)
dbms_target_link_libraries (PRIVATE ch_contrib::icu)
endif ()

View File

@ -1,6 +1,5 @@
#include "SensitiveDataMasker.h"
#include <mutex>
#include <set>
#include <string>
#include <atomic>
@ -95,28 +94,20 @@ public:
SensitiveDataMasker::~SensitiveDataMasker() = default;
std::unique_ptr<SensitiveDataMasker> SensitiveDataMasker::sensitive_data_masker = nullptr;
std::mutex SensitiveDataMasker::instance_mutex;
void SensitiveDataMasker::setInstance(std::unique_ptr<SensitiveDataMasker> sensitive_data_masker_)
{
if (!sensitive_data_masker_)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: the 'sensitive_data_masker' is not set");
std::lock_guard lock(instance_mutex);
if (sensitive_data_masker_->rulesCount() > 0)
{
sensitive_data_masker = std::move(sensitive_data_masker_);
}
else
{
sensitive_data_masker.reset();
}
}
SensitiveDataMasker * SensitiveDataMasker::getInstance()
{
std::lock_guard lock(instance_mutex);
return sensitive_data_masker.get();
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <memory>
#include <mutex>
#include <vector>
#include <cstdint>
@ -46,7 +45,6 @@ class SensitiveDataMasker
private:
class MaskingRule;
std::vector<std::unique_ptr<MaskingRule>> all_masking_rules;
static std::mutex instance_mutex;
static std::unique_ptr<SensitiveDataMasker> sensitive_data_masker;
public:

View File

@ -62,6 +62,7 @@
#cmakedefine01 FIU_ENABLE
#cmakedefine01 USE_BCRYPT
#cmakedefine01 USE_LIBARCHIVE
#cmakedefine01 USE_POCKETFFT
/// This is needed for .incbin in assembly. For some reason, include paths don't work there in presence of LTO.
/// That's why we use absolute paths.

View File

@ -123,7 +123,7 @@ namespace Format
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot switch from automatic field numbering to manual field specification");
is_plain_numbering = true;
if (index_if_plain >= argument_number)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument is too big for formatting");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not enough arguments to fill the placeholders in the format string");
index_positions.back() = index_if_plain++;
}
else

View File

@ -90,7 +90,8 @@ void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfigura
for (const auto & setting : settings.all())
{
const auto & name = setting.getName();
if (config.has(name) && !setting.isObsolete())
bool should_skip_check = name == "max_table_size_to_drop" || name == "max_partition_size_to_drop";
if (config.has(name) && !setting.isObsolete() && !should_skip_check)
{
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "A setting '{}' appeared at top level in config {}."
" But it is user-level setting that should be located in users.xml inside <profiles> section for specific profile."

View File

@ -527,6 +527,8 @@ class IColumn;
M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited.", 0) \
M(Bool, check_query_single_value_result, true, "Return check query result as single 1/0 value", 0) \
M(Bool, allow_drop_detached, false, "Allow ALTER TABLE ... DROP DETACHED PART[ITION] ... queries", 0) \
M(UInt64, max_table_size_to_drop, 0, "Only available in ClickHouse Cloud", 0) \
M(UInt64, max_partition_size_to_drop, 0, "Only available in ClickHouse Cloud", 0) \
\
M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \
M(UInt64, postgresql_connection_pool_wait_timeout, 5000, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \
@ -620,6 +622,7 @@ class IColumn;
M(Bool, describe_include_subcolumns, false, "If true, subcolumns of all table columns will be included into result of DESCRIBE query", 0) \
M(Bool, describe_include_virtual_columns, false, "If true, virtual columns of table will be included into result of DESCRIBE query", 0) \
M(Bool, describe_compact_output, false, "If true, include only column names and types into result of DESCRIBE query", 0) \
M(Bool, apply_mutations_on_fly, false, "Only available in ClickHouse Cloud", 0) \
M(Bool, mutations_execute_nondeterministic_on_initiator, false, "If true nondeterministic function are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \
@ -671,6 +674,8 @@ class IColumn;
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(Bool, database_replicated_allow_only_replicated_engine, false, "Allow to create only Replicated tables in database with engine Replicated", 0) \
M(Bool, database_replicated_allow_replicated_engine_arguments, true, "Allow to create only Replicated tables in database with engine Replicated with explicit arguments", 0) \
M(Bool, cloud_mode, false, "Only available in ClickHouse Cloud", 0) \
M(UInt64, cloud_mode_engine, 1, "Only available in ClickHouse Cloud", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result, one of: 'none', 'throw', 'null_status_on_timeout', 'never_throw'", 0) \
M(UInt64, distributed_ddl_entry_format_version, 5, "Compatibility version of distributed DDL (ON CLUSTER) queries", 0) \
\
@ -724,6 +729,7 @@ class IColumn;
M(UInt64, merge_tree_min_bytes_per_task_for_remote_reading, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes to read per task.", 0) \
M(Bool, merge_tree_use_const_size_tasks_for_remote_reading, true, "Whether to use constant size tasks for reading from a remote table.", 0) \
M(Bool, merge_tree_determine_task_size_by_prewhere_columns, true, "Whether to use only prewhere columns size to determine reading task size.", 0) \
M(UInt64, merge_tree_compact_parts_min_granules_to_multibuffer_read, 16, "Only available in ClickHouse Cloud", 0) \
\
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
@ -835,6 +841,10 @@ class IColumn;
M(Bool, print_pretty_type_names, false, "Print pretty type names in DESCRIBE query and toTypeName() function", 0) \
M(Bool, create_table_empty_primary_key_by_default, false, "Allow to create *MergeTree tables with empty primary key when ORDER BY and PRIMARY KEY not specified", 0) \
M(Bool, allow_named_collection_override_by_default, true, "Allow named collections' fields override by default.", 0)\
M(Bool, allow_experimental_shared_merge_tree, false, "Only available in ClickHouse Cloud", 0) \
M(UInt64, cache_warmer_threads, 4, "Only available in ClickHouse Cloud", 0) \
M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \
M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS.

View File

@ -89,14 +89,15 @@ void DatabaseAtomic::drop(ContextPtr)
fs::remove_all(getMetadataPath());
}
void DatabaseAtomic::attachTableUnlocked(ContextPtr local_context, const String & name, const StoragePtr & table, const String & relative_table_path)
void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name, const StoragePtr & table, const String & relative_table_path)
{
assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use;
std::lock_guard lock(mutex);
not_in_use = cleanupDetachedTables();
auto table_id = table->getStorageID();
assertDetachedTableNotInUse(table_id.uuid);
DatabaseOrdinary::attachTableUnlocked(local_context, name, table, relative_table_path);
DatabaseOrdinary::attachTableUnlocked(name, table);
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
}
@ -324,7 +325,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
/// It throws if `table_metadata_path` already exists (it's possible if table was detached)
renameNoReplace(table_metadata_tmp_path, table_metadata_path); /// Commit point (a sort of)
DatabaseWithOwnTablesBase::attachTableUnlocked(query_context, query.getTable(), table, /*relative_table_path=*/ {}); /// Should never throw
attachTableUnlocked(query.getTable(), table); /// Should never throw
table_name_to_path.emplace(query.getTable(), table_data_path);
}
catch (...)

View File

@ -38,6 +38,7 @@ public:
void dropTable(ContextPtr context, const String & table_name, bool sync) override;
void dropTableImpl(ContextPtr context, const String & table_name, bool sync);
void attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & name) override;
String getTableDataPath(const String & table_name) const override;
@ -65,8 +66,6 @@ public:
void setDetachedTableNotInUseForce(const UUID & uuid) override;
protected:
void attachTableUnlocked(ContextPtr local_context, const String & name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
void commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path, const String & statement, ContextPtr query_context) override;
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
const String & table_metadata_tmp_path, const String & table_metadata_path, ContextPtr query_context) override;

View File

@ -168,9 +168,10 @@ bool DatabaseLazy::empty() const
return tables_cache.empty();
}
void DatabaseLazy::attachTableUnlocked(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
void DatabaseLazy::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
LOG_DEBUG(log, "Attach table {}.", backQuote(table_name));
std::lock_guard lock(mutex);
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
auto [it, inserted] = tables_cache.emplace(std::piecewise_construct,

View File

@ -64,15 +64,14 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void shutdown() override;
~DatabaseLazy() override;
protected:
void attachTableUnlocked(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
private:
struct CacheExpirationQueueElement
{

View File

@ -33,13 +33,13 @@ DatabaseMemory::DatabaseMemory(const String & name_, ContextPtr context_)
}
void DatabaseMemory::createTable(
ContextPtr local_context,
ContextPtr /*context*/,
const String & table_name,
const StoragePtr & table,
const ASTPtr & query)
{
std::lock_guard lock{mutex};
attachTableUnlocked(local_context, table_name, table, /*relative_table_path=*/ {});
attachTableUnlocked(table_name, table);
/// Clean the query from temporary flags.
ASTPtr query_to_store = query;
@ -56,7 +56,7 @@ void DatabaseMemory::createTable(
}
void DatabaseMemory::dropTable(
ContextPtr local_context,
ContextPtr /*context*/,
const String & table_name,
bool /*sync*/)
{
@ -83,7 +83,7 @@ void DatabaseMemory::dropTable(
catch (...)
{
std::lock_guard lock{mutex};
attachTableUnlocked(local_context, table_name, table, /*relative_table_path=*/ {});
attachTableUnlocked(table_name, table);
throw;
}

View File

@ -7,7 +7,6 @@
#include <Parsers/formatAST.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageFactory.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/escapeForFileName.h>
@ -200,7 +199,7 @@ DatabaseWithOwnTablesBase::DatabaseWithOwnTablesBase(const String & name_, const
bool DatabaseWithOwnTablesBase::isTableExist(const String & table_name, ContextPtr) const
{
std::lock_guard lock(mutex);
return tables.find(table_name) != tables.end() || lazy_tables.find(table_name) != lazy_tables.end();
return tables.find(table_name) != tables.end();
}
StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, ContextPtr) const
@ -212,9 +211,6 @@ StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, Con
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
{
std::lock_guard lock(mutex);
loadLazyTables();
if (!filter_by_table_name)
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
@ -261,7 +257,13 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
return res;
}
void DatabaseWithOwnTablesBase::attachTableUnlocked(ContextPtr, const String & name, const StoragePtr & table, const String &)
void DatabaseWithOwnTablesBase::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
std::lock_guard lock(mutex);
attachTableUnlocked(table_name, table);
}
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table)
{
auto table_id = table->getStorageID();
if (table_id.database_name != database_name)
@ -274,7 +276,7 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(ContextPtr, const String & n
DatabaseCatalog::instance().addUUIDMapping(table_id.uuid, shared_from_this(), table);
}
if (!tables.emplace(name, table).second)
if (!tables.emplace(table_name, table).second)
{
if (table_id.hasUUID())
DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid);
@ -287,12 +289,6 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(ContextPtr, const String & n
CurrentMetrics::add(CurrentMetrics::AttachedTable, 1);
}
void DatabaseWithOwnTablesBase::registerLazyTableUnlocked(const String & table_name, LazyTableCreator table_creator, const String & relative_table_path)
{
if (!lazy_tables.emplace(table_name, std::make_pair(relative_table_path, std::move(table_creator))).second)
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {} already registered.", table_name);
}
void DatabaseWithOwnTablesBase::shutdown()
{
/// You can not hold a lock during shutdown.
@ -393,45 +389,10 @@ void DatabaseWithOwnTablesBase::createTableRestoredFromBackup(const ASTPtr & cre
StoragePtr DatabaseWithOwnTablesBase::tryGetTableNoWait(const String & table_name) const
{
std::lock_guard lock(mutex);
auto it = tables.find(table_name);
if (it != tables.end())
return it->second;
const auto lazy_it = lazy_tables.find(table_name);
if (lazy_it != lazy_tables.end())
{
LOG_DEBUG(log, "Attaching lazy table {}", backQuoteIfNeed(table_name));
auto relative_table_path = lazy_it->second.first;
auto storage = lazy_it->second.second();
lazy_tables.erase(lazy_it);
(const_cast<DatabaseWithOwnTablesBase *>(this))->attachTableUnlocked(Context::getGlobalContextInstance(), table_name, storage, relative_table_path);
it = tables.find(table_name);
if (it != tables.end())
return it->second;
}
return {};
}
void DatabaseWithOwnTablesBase::loadLazyTables() const
{
if (lazy_tables.empty())
return;
ContextPtr global_context = Context::getGlobalContextInstance();
while (!lazy_tables.empty())
{
auto lazy_it = lazy_tables.begin();
const auto table_name = lazy_it->first;
LOG_DEBUG(log, "Attaching lazy table {}", backQuoteIfNeed(table_name));
auto relative_table_path = lazy_it->second.first;
auto storage = lazy_it->second.second();
lazy_tables.erase(lazy_it);
(const_cast<DatabaseWithOwnTablesBase *>(this))->attachTableUnlocked(global_context, table_name, storage, relative_table_path);
}
}
}

View File

@ -30,6 +30,8 @@ public:
bool empty() const override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
@ -43,19 +45,14 @@ public:
protected:
Tables tables TSA_GUARDED_BY(mutex);
/// Tables that are attached lazily
mutable LazyTables lazy_tables TSA_GUARDED_BY(mutex);
Poco::Logger * log;
DatabaseWithOwnTablesBase(const String & name_, const String & logger, ContextPtr context);
void attachTableUnlocked(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
void registerLazyTableUnlocked(const String & table_name, LazyTableCreator table_creator, const String & relative_table_path) TSA_REQUIRES(mutex) override;
void attachTableUnlocked(const String & table_name, const StoragePtr & table) TSA_REQUIRES(mutex);
StoragePtr detachTableUnlocked(const String & table_name) TSA_REQUIRES(mutex);
StoragePtr getTableUnlocked(const String & table_name) const TSA_REQUIRES(mutex);
StoragePtr tryGetTableNoWait(const String & table_name) const;
void loadLazyTables() const TSA_REQUIRES(mutex);
};
}

View File

@ -27,12 +27,21 @@ StoragePtr IDatabase::getTable(const String & name, ContextPtr context) const
{
if (auto storage = tryGetTable(name, context))
return storage;
TableNameHints hints(this->shared_from_this(), context);
std::vector<String> names = hints.getHints(name);
if (names.empty())
/// hint is a pair which holds a single database_name and table_name suggestion for the given table name.
auto hint = hints.getHintForTable(name);
if (hint.first.empty())
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} does not exist", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
else
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} does not exist. Maybe you meant {}?", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name), backQuoteIfNeed(names[0]));
throw Exception(
ErrorCodes::UNKNOWN_TABLE,
"Table {}.{} does not exist. Maybe you meant {}.{}?",
backQuoteIfNeed(getDatabaseName()),
backQuoteIfNeed(name),
backQuoteIfNeed(hint.first),
backQuoteIfNeed(hint.second));
}
IDatabase::IDatabase(String database_name_) : database_name(std::move(database_name_))
@ -62,20 +71,4 @@ void IDatabase::createTableRestoredFromBackup(const ASTPtr & create_table_query,
backQuoteIfNeed(create_table_query->as<const ASTCreateQuery &>().getTable()));
}
/// Add a table to the database, but do not add it to the metadata. The database may not support this method.
///
/// Note: ATTACH TABLE statement actually uses createTable method.
void IDatabase::attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) /// NOLINT
{
std::lock_guard lock(mutex);
attachTableUnlocked(context, name, table, relative_table_path);
}
void IDatabase::registerLazyTable(ContextPtr, const String & table_name, LazyTableCreator table_creator, const String & relative_table_path) /// NOLINT
{
std::lock_guard lock(mutex);
registerLazyTableUnlocked(table_name, std::move(table_creator), relative_table_path);
}
}

View File

@ -125,6 +125,7 @@ public:
using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
/** Database engine.
* It is responsible for:
* - initialization of set of known tables and dictionaries;
@ -137,10 +138,6 @@ using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
class IDatabase : public std::enable_shared_from_this<IDatabase>
{
public:
using LazyTableCreator = std::function<StoragePtr()>;
/// Map{table_name, Pair{relative_table_path, LazyTableCreator}}
using LazyTables = std::map<String, std::pair<String, LazyTableCreator>>;
IDatabase() = delete;
explicit IDatabase(String database_name_);
@ -272,17 +269,11 @@ public:
/// Add a table to the database, but do not add it to the metadata. The database may not support this method.
///
/// @param relative_table_path - only for Atomic engine
///
/// Note:
/// - ATTACH TABLE statement actually uses createTable method.
/// - Instead of overriding this method you should override attachTableUnlocked()
/// (This method is only for DatabasesOverlay to override)
virtual void attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path = {}); /// NOLINT
/// Register tables lazily (attach will be done only when the table will be used) instead of attaching it.
/// This is needed to improve startup time of clickhouse-local.
virtual void registerLazyTable(ContextPtr context, const String & table_name, LazyTableCreator table_creator, const String & relative_table_path = {});
/// Note: ATTACH TABLE statement actually uses createTable method.
virtual void attachTable(ContextPtr /* context */, const String & /*name*/, const StoragePtr & /*table*/, [[maybe_unused]] const String & relative_table_path = {}) /// NOLINT
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no ATTACH TABLE query for Database{}", getEngineName());
}
/// Forget about the table without deleting it, and return it. The database may not support this method.
virtual StoragePtr detachTable(ContextPtr /* context */, const String & /*name*/)
@ -439,16 +430,6 @@ protected:
return nullptr;
}
virtual void attachTableUnlocked(ContextPtr /*context*/, const String & /*name*/, const StoragePtr & /*table*/, const String & /*relative_table_path*/ = {}) TSA_REQUIRES(mutex) /// NOLINT
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no ATTACH TABLE query for Database{}", getEngineName());
}
virtual void registerLazyTableUnlocked(const String & /* table_name */, LazyTableCreator /* table_creator */, const String & /* relative_table_path */) TSA_REQUIRES(mutex) /// NOLINT
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There lazy table initialization support for Database{}", getEngineName());
}
mutable std::mutex mutex;
String database_name TSA_GUARDED_BY(mutex);
String comment TSA_GUARDED_BY(mutex);

View File

@ -101,10 +101,10 @@ void DatabaseMaterializedMySQL::dropTable(ContextPtr context_, const String & na
DatabaseAtomic::dropTable(context_, name, sync);
}
void DatabaseMaterializedMySQL::attachTableUnlocked(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)
void DatabaseMaterializedMySQL::attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)
{
checkIsInternalQuery(context_, "ATTACH TABLE");
DatabaseAtomic::attachTableUnlocked(context_, name, table, relative_table_path);
DatabaseAtomic::attachTable(context_, name, table, relative_table_path);
}
StoragePtr DatabaseMaterializedMySQL::detachTable(ContextPtr context_, const String & name)

View File

@ -48,8 +48,6 @@ protected:
LoadTaskPtr startup_mysql_database_task;
void attachTableUnlocked(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
public:
String getEngineName() const override { return "MaterializedMySQL"; }
@ -60,6 +58,8 @@ public:
void dropTable(ContextPtr context_, const String & name, bool sync) override;
void attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context_, const String & name) override;
void renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary) override;

View File

@ -361,8 +361,10 @@ void DatabaseMySQL::cleanOutdatedTables()
}
}
void DatabaseMySQL::attachTableUnlocked(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
void DatabaseMySQL::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
{
std::lock_guard lock{mutex};
if (!local_tables_cache.contains(table_name))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Cannot attach table {}.{} because it does not exist.",
backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));

View File

@ -84,9 +84,9 @@ public:
void dropTable(ContextPtr context, const String & table_name, bool sync) override;
protected:
void attachTableUnlocked(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) TSA_REQUIRES(mutex) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
protected:
ASTPtr getCreateTableQueryImpl(const String & name, ContextPtr context, bool throw_on_error) const override;
private:

View File

@ -216,8 +216,10 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr
}
void DatabasePostgreSQL::attachTableUnlocked(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
void DatabasePostgreSQL::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
{
std::lock_guard lock{mutex};
if (!checkPostgresTable(table_name))
throw Exception(ErrorCodes::UNKNOWN_TABLE,
"Cannot attach PostgreSQL table {} because it does not exist in PostgreSQL (database: {})",

View File

@ -54,14 +54,13 @@ public:
void createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void dropTable(ContextPtr, const String & table_name, bool sync) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void drop(ContextPtr /*context*/) override;
void shutdown() override;
protected:
void attachTableUnlocked(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) TSA_REQUIRES(mutex) override;
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
private:

View File

@ -98,7 +98,7 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
auto implementation_buffer = object_storage->writeObject(object, mode, attributes, buf_size, modified_write_settings);
bool cache_on_write = modified_write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getByName(cache_config_name).settings.cache_on_write_operations
&& FileCacheFactory::instance().getByName(cache_config_name)->getSettings().cache_on_write_operations
&& fs::path(object.remote_path).extension() != ".tmp";
/// Need to remove even if cache_on_write == false.

View File

@ -64,7 +64,7 @@ void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check *
}
}
auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings);
auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings, predefined_configuration ? "" : config_prefix);
auto disk = disk_it->second;
if (!dynamic_cast<const DiskObjectStorage *>(disk.get()))
throw Exception(ErrorCodes::BAD_ARGUMENTS,

View File

@ -46,6 +46,17 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction()
send_metadata ? metadata_helper.get() : nullptr);
}
DiskTransactionPtr DiskObjectStorage::createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk)
{
return std::make_shared<MultipleDisksObjectStorageTransaction>(
*object_storage,
*metadata_storage,
*to_disk.getObjectStorage(),
*to_disk.getMetadataStorage(),
send_metadata ? metadata_helper.get() : nullptr);
}
DiskObjectStorage::DiskObjectStorage(
const String & name_,
const String & object_key_prefix_,
@ -179,10 +190,11 @@ void DiskObjectStorage::copyFile( /// NOLINT
const std::function<void()> & cancellation_hook
)
{
if (this == &to_disk)
if (getDataSourceDescription() == to_disk.getDataSourceDescription())
{
/// It may use s3-server-side copy
auto transaction = createObjectStorageTransaction();
auto & to_disk_object_storage = dynamic_cast<DiskObjectStorage &>(to_disk);
auto transaction = createObjectStorageTransactionToAnotherDisk(to_disk_object_storage);
transaction->copyFile(from_file_path, to_file_path);
transaction->commit();
}

View File

@ -222,6 +222,7 @@ private:
/// Create actual disk object storage transaction for operations
/// execution.
DiskTransactionPtr createObjectStorageTransaction();
DiskTransactionPtr createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk);
String getReadResourceName() const;
String getWriteResourceName() const;

View File

@ -38,6 +38,29 @@ DiskObjectStorageTransaction::DiskObjectStorageTransaction(
, metadata_helper(metadata_helper_)
{}
DiskObjectStorageTransaction::DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
MetadataTransactionPtr metadata_transaction_)
: object_storage(object_storage_)
, metadata_storage(metadata_storage_)
, metadata_transaction(metadata_transaction_)
, metadata_helper(metadata_helper_)
{}
MultipleDisksObjectStorageTransaction::MultipleDisksObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
IObjectStorage& destination_object_storage_,
IMetadataStorage& destination_metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_)
: DiskObjectStorageTransaction(object_storage_, metadata_storage_, metadata_helper_, destination_metadata_storage_.createTransaction())
, destination_object_storage(destination_object_storage_)
, destination_metadata_storage(destination_metadata_storage_)
{}
namespace
{
/// Operation which affects only metadata. Simplest way to
@ -485,10 +508,12 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
std::string to_path;
StoredObjects created_objects;
IObjectStorage& destination_object_storage;
CopyFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
IObjectStorage & destination_object_storage_,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const std::string & from_path_,
@ -498,6 +523,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
, write_settings(write_settings_)
, from_path(from_path_)
, to_path(to_path_)
, destination_object_storage(destination_object_storage_)
{}
std::string getInfoForLog() const override
@ -515,7 +541,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
auto object_key = object_storage.generateObjectKeyForPath(to_path);
auto object_to = StoredObject(object_key.serialize());
object_storage.copyObject(object_from, object_to, read_settings, write_settings);
object_storage.copyObjectToAnotherObjectStorage(object_from, object_to,read_settings,write_settings, destination_object_storage);
tx->addBlobToMetadata(to_path, object_key, object_from.bytes_size);
@ -526,7 +552,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
void undo() override
{
for (const auto & object : created_objects)
object_storage.removeObject(object);
destination_object_storage.removeObject(object);
}
void finalize() override
@ -859,7 +885,13 @@ void DiskObjectStorageTransaction::createFile(const std::string & path)
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
{
operations_to_execute.emplace_back(
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, read_settings, write_settings, from_file_path, to_file_path));
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, object_storage, read_settings, write_settings, from_file_path, to_file_path));
}
void MultipleDisksObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
{
operations_to_execute.emplace_back(
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, destination_object_storage, read_settings, write_settings, from_file_path, to_file_path));
}
void DiskObjectStorageTransaction::commit()

View File

@ -50,9 +50,9 @@ using DiskObjectStorageOperations = std::vector<DiskObjectStorageOperation>;
///
/// If something wrong happen on step 1 or 2 reverts all applied operations.
/// If finalize failed -- nothing is reverted, garbage is left in blob storage.
struct DiskObjectStorageTransaction final : public IDiskTransaction, std::enable_shared_from_this<DiskObjectStorageTransaction>
struct DiskObjectStorageTransaction : public IDiskTransaction, std::enable_shared_from_this<DiskObjectStorageTransaction>
{
private:
protected:
IObjectStorage & object_storage;
IMetadataStorage & metadata_storage;
@ -63,6 +63,12 @@ private:
DiskObjectStorageOperations operations_to_execute;
DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
MetadataTransactionPtr metadata_transaction_);
public:
DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
@ -118,6 +124,21 @@ public:
void createHardLink(const std::string & src_path, const std::string & dst_path) override;
};
struct MultipleDisksObjectStorageTransaction final : public DiskObjectStorageTransaction, std::enable_shared_from_this<MultipleDisksObjectStorageTransaction>
{
IObjectStorage& destination_object_storage;
IMetadataStorage& destination_metadata_storage;
MultipleDisksObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
IObjectStorage& destination_object_storage,
IMetadataStorage& destination_metadata_storage,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_);
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;
};
using DiskObjectStorageTransactionPtr = std::shared_ptr<DiskObjectStorageTransaction>;
}

View File

@ -458,12 +458,13 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
/// Shortcut for S3
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
{
auto client_ = client.get();
auto client_ = dest_s3->client.get();
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
try {
copyS3File(
client.get(),
client_,
bucket,
object_from.remote_path,
0,
@ -476,8 +477,19 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
object_to_attributes,
scheduler,
/* for_disk_s3= */ true);
return;
}
else
catch (S3Exception & exc)
{
/// If authentication/permissions error occurs then fallthrough to copy with buffer.
if (exc.getS3ErrorCode() != Aws::S3::S3Errors::ACCESS_DENIED)
throw;
LOG_WARNING(&Poco::Logger::get("S3ObjectStorage"),
"S3-server-side copy object from the disk {} to the disk {} can not be performed: {}\n",
getName(), dest_s3->getName(), exc.what());
}
}
IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, object_storage_to, object_to_attributes);
}

View File

@ -99,6 +99,10 @@ if (TARGET ch_contrib::rapidjson)
list (APPEND PRIVATE_LIBS ch_contrib::rapidjson)
endif()
if (TARGET ch_contrib::pocketfft)
list (APPEND PRIVATE_LIBS ch_contrib::pocketfft)
endif()
if (TARGET ch_contrib::crc32-vpmsum)
list (APPEND PUBLIC_LIBS ch_contrib::crc32-vpmsum)
endif()

View File

@ -0,0 +1,227 @@
#include "config.h"
#if USE_POCKETFFT
# ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wshadow"
# pragma clang diagnostic ignored "-Wextra-semi-stmt"
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
# endif
# include <pocketfft_hdronly.h>
# ifdef __clang__
# pragma clang diagnostic pop
# endif
# include <cmath>
# include <Columns/ColumnArray.h>
# include <Columns/ColumnsNumber.h>
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypesNumber.h>
# include <Functions/FunctionFactory.h>
# include <Functions/FunctionHelpers.h>
# include <Functions/IFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
/*Detect Period in time series data using FFT.
* FFT - Fast Fourier transform (https://en.wikipedia.org/wiki/Fast_Fourier_transform)
* 1. Convert time series data to frequency domain using FFT.
* 2. Remove the 0th(the Dc component) and n/2th the Nyquist frequency
* 3. Find the peak value (highest) for dominant frequency component.
* 4. Inverse of the dominant frequency component is the period.
*/
class FunctionSeriesPeriodDetectFFT : public IFunction
{
public:
static constexpr auto name = "seriesPeriodDetectFFT";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionSeriesPeriodDetectFFT>(); }
std::string getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
FunctionArgumentDescriptors args{{"time_series", &isArray<IDataType>, nullptr, "Array"}};
validateFunctionArgumentTypes(*this, arguments, args);
return std::make_shared<DataTypeFloat64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
ColumnPtr array_ptr = arguments[0].column;
const ColumnArray * array = checkAndGetColumn<ColumnArray>(array_ptr.get());
const IColumn & src_data = array->getData();
const ColumnArray::Offsets & offsets = array->getOffsets();
auto res = ColumnFloat64::create(input_rows_count);
auto & res_data = res->getData();
ColumnArray::Offset prev_src_offset = 0;
Float64 period;
for (size_t i = 0; i < input_rows_count; ++i)
{
ColumnArray::Offset curr_offset = offsets[i];
if (executeNumbers<UInt8>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<UInt16>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<UInt32>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<UInt64>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Int8>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Int16>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Int32>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Int64>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Float32>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Float64>(src_data, period, prev_src_offset, curr_offset))
{
res_data[i] = period;
prev_src_offset = curr_offset;
}
else
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}",
arguments[0].column->getName(),
getName());
}
return res;
}
template <typename T>
bool executeNumbers(const IColumn & src_data, Float64 & period, ColumnArray::Offset & start, ColumnArray::Offset & end) const
{
const ColumnVector<T> * src_data_concrete = checkAndGetColumn<ColumnVector<T>>(&src_data);
if (!src_data_concrete)
return false;
const PaddedPODArray<T> & src_vec = src_data_concrete->getData();
chassert(start <= end);
size_t len = end - start;
if (len < 4)
{
period = NAN; // At least four data points are required to detect period
return true;
}
std::vector<Float64> src((src_vec.begin() + start), (src_vec.begin() + end));
std::vector<std::complex<double>> out((len / 2) + 1);
pocketfft::shape_t shape{len};
pocketfft::shape_t axes;
axes.reserve(shape.size());
for (size_t i = 0; i < shape.size(); ++i)
axes.push_back(i);
pocketfft::stride_t stride_src{sizeof(double)};
pocketfft::stride_t stride_out{sizeof(std::complex<double>)};
pocketfft::r2c(shape, stride_src, stride_out, axes, pocketfft::FORWARD, src.data(), out.data(), static_cast<double>(1));
size_t spec_len = (len - 1) / 2; //removing the nyquist element when len is even
double max_mag = 0;
size_t idx = 1;
for (size_t i = 1; i < spec_len; ++i)
{
double magnitude = sqrt(out[i].real() * out[i].real() + out[i].imag() * out[i].imag());
if (magnitude > max_mag)
{
max_mag = magnitude;
idx = i;
}
}
// In case all FFT values are zero, it means the input signal is flat.
// It implies the period of the series should be 0.
if (max_mag == 0)
{
period = 0;
return true;
}
std::vector<double> xfreq(spec_len);
double step = 0.5 / (spec_len - 1);
for (size_t i = 0; i < spec_len; ++i)
xfreq[i] = i * step;
auto freq = xfreq[idx];
period = std::round(1 / freq);
return true;
}
};
REGISTER_FUNCTION(SeriesPeriodDetectFFT)
{
factory.registerFunction<FunctionSeriesPeriodDetectFFT>(FunctionDocumentation{
.description = R"(
Finds the period of the given time series data using FFT
FFT - Fast Fourier transform (https://en.wikipedia.org/wiki/Fast_Fourier_transform)
**Syntax**
``` sql
seriesPeriodDetectFFT(series);
```
**Arguments**
- `series` - An array of numeric values
**Returned value**
- A real value equal to the period of time series
- Returns NAN when number of data points are less than four.
Type: [Float64](../../sql-reference/data-types/float.md).
**Examples**
Query:
``` sql
SELECT seriesPeriodDetectFFT([1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6]) AS print_0;
```
Result:
``` text
print_0
3
```
``` sql
SELECT seriesPeriodDetectFFT(arrayMap(x -> abs((x % 6) - 3), range(1000))) AS print_0;
```
Result:
``` text
print_0
6
```
)",
.categories{"Time series analysis"}});
}
}
#endif

View File

@ -76,10 +76,9 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
: max_file_segment_size(settings.max_file_segment_size)
, bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0)
, boundary_alignment(settings.boundary_alignment)
, background_download_threads(settings.background_download_threads)
, metadata_download_threads(settings.load_metadata_threads)
, load_metadata_threads(settings.load_metadata_threads)
, log(&Poco::Logger::get("FileCache(" + cache_name + ")"))
, metadata(settings.base_path, settings.background_download_queue_size_limit)
, metadata(settings.base_path, settings.background_download_queue_size_limit, settings.background_download_threads)
{
if (settings.cache_policy == "LRU")
main_priority = std::make_unique<LRUFileCachePriority>(settings.max_size, settings.max_elements);
@ -159,12 +158,8 @@ void FileCache::initialize()
throw;
}
metadata.startup();
is_initialized = true;
for (size_t i = 0; i < background_download_threads; ++i)
download_threads.emplace_back([this] { metadata.downloadThreadFunc(); });
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ metadata.cleanupThreadFunc(); }});
}
CacheGuard::Lock FileCache::lockCache() const
@ -299,7 +294,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
size_t size,
FileSegment::State state,
size_t file_segments_limit,
const CreateFileSegmentSettings & settings)
const CreateFileSegmentSettings & create_settings)
{
assert(size > 0);
@ -316,7 +311,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
remaining_size -= current_file_segment_size;
auto file_segment_metadata_it = addFileSegment(
locked_key, current_pos, current_file_segment_size, state, settings, nullptr);
locked_key, current_pos, current_file_segment_size, state, create_settings, nullptr);
file_segments.push_back(file_segment_metadata_it->second->file_segment);
current_pos += current_file_segment_size;
@ -331,7 +326,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
const FileSegment::Range & range,
size_t file_segments_limit,
bool fill_with_detached_file_segments,
const CreateFileSegmentSettings & settings)
const CreateFileSegmentSettings & create_settings)
{
/// There are segments [segment1, ..., segmentN]
/// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
@ -388,7 +383,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
if (fill_with_detached_file_segments)
{
auto file_segment = std::make_shared<FileSegment>(
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, settings);
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, create_settings);
file_segments.insert(it, file_segment);
++processed_count;
@ -399,7 +394,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
FileSegments hole;
for (const auto & r : ranges)
{
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr);
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, create_settings, nullptr);
hole.push_back(metadata_it->second->file_segment);
++processed_count;
@ -444,7 +439,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
if (fill_with_detached_file_segments)
{
auto file_segment = std::make_shared<FileSegment>(
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, settings);
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, create_settings);
file_segments.insert(file_segments.end(), file_segment);
}
@ -454,7 +449,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
FileSegments hole;
for (const auto & r : ranges)
{
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr);
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, create_settings, nullptr);
hole.push_back(metadata_it->second->file_segment);
++processed_count;
@ -473,7 +468,7 @@ FileSegmentsHolderPtr FileCache::set(
const Key & key,
size_t offset,
size_t size,
const CreateFileSegmentSettings & settings)
const CreateFileSegmentSettings & create_settings)
{
assertInitialized();
@ -484,17 +479,17 @@ FileSegmentsHolderPtr FileCache::set(
if (!file_segments.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Having intersection with already existing cache");
if (settings.unbounded)
if (create_settings.unbounded)
{
/// If the file is unbounded, we can create a single file_segment_metadata for it.
auto file_segment_metadata_it = addFileSegment(
*locked_key, offset, size, FileSegment::State::EMPTY, settings, nullptr);
*locked_key, offset, size, FileSegment::State::EMPTY, create_settings, nullptr);
file_segments = {file_segment_metadata_it->second->file_segment};
}
else
{
file_segments = splitRangeIntoFileSegments(
*locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, settings);
*locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, create_settings);
}
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
@ -506,7 +501,7 @@ FileCache::getOrSet(
size_t offset,
size_t size,
size_t file_size,
const CreateFileSegmentSettings & settings,
const CreateFileSegmentSettings & create_settings,
size_t file_segments_limit)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheGetOrSetMicroseconds);
@ -612,7 +607,7 @@ FileCache::getOrSet(
if (file_segments.empty())
{
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, settings);
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, create_settings);
}
else
{
@ -620,7 +615,7 @@ FileCache::getOrSet(
chassert(file_segments.back()->range().left <= range.right);
fillHolesWithEmptyFileSegments(
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */false, settings);
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */false, create_settings);
if (!file_segments.front()->range().contains(offset))
{
@ -675,7 +670,7 @@ KeyMetadata::iterator FileCache::addFileSegment(
size_t offset,
size_t size,
FileSegment::State state,
const CreateFileSegmentSettings & settings,
const CreateFileSegmentSettings & create_settings,
const CacheGuard::Lock * lock)
{
/// Create a file_segment_metadata and put it in `files` map by [key][offset].
@ -729,7 +724,7 @@ KeyMetadata::iterator FileCache::addFileSegment(
result_state = state;
}
auto file_segment = std::make_shared<FileSegment>(key, offset, size, result_state, settings, background_download_threads > 0, this, locked_key.getKeyMetadata());
auto file_segment = std::make_shared<FileSegment>(key, offset, size, result_state, create_settings, metadata.isBackgroundDownloadEnabled(), this, locked_key.getKeyMetadata());
auto file_segment_metadata = std::make_shared<FileSegmentMetadata>(std::move(file_segment));
auto [file_segment_metadata_it, inserted] = locked_key.emplace(offset, file_segment_metadata);
@ -933,9 +928,9 @@ void FileCache::loadMetadataImpl()
std::mutex set_exception_mutex;
std::atomic<bool> stop_loading = false;
LOG_INFO(log, "Loading filesystem cache with {} threads", metadata_download_threads);
LOG_INFO(log, "Loading filesystem cache with {} threads", load_metadata_threads);
for (size_t i = 0; i < metadata_download_threads; ++i)
for (size_t i = 0; i < load_metadata_threads; ++i)
{
try
{
@ -1137,15 +1132,8 @@ FileCache::~FileCache()
void FileCache::deactivateBackgroundOperations()
{
metadata.cancelDownload();
metadata.cancelCleanup();
for (auto & thread : download_threads)
if (thread.joinable())
thread.join();
if (cleanup_thread && cleanup_thread->joinable())
cleanup_thread->join();
shutdown.store(true);
metadata.shutdown();
}
std::vector<FileSegment::Info> FileCache::getFileSegmentInfos()
@ -1220,6 +1208,43 @@ void FileCache::assertCacheCorrectness()
});
}
void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings, FileCacheSettings & actual_settings)
{
if (!is_initialized || shutdown || new_settings == actual_settings)
return;
std::lock_guard lock(apply_settings_mutex);
if (metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit))
{
LOG_INFO(log, "Changed background_download_queue_size from {} to {}",
actual_settings.background_download_queue_size_limit,
new_settings.background_download_queue_size_limit);
actual_settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit;
}
bool updated;
try
{
updated = metadata.setBackgroundDownloadThreads(new_settings.background_download_threads);
}
catch (...)
{
actual_settings.background_download_threads = metadata.getBackgroundDownloadThreads();
throw;
}
if (updated)
{
LOG_INFO(log, "Changed background_download_threads from {} to {}",
actual_settings.background_download_threads,
new_settings.background_download_threads);
actual_settings.background_download_threads = new_settings.background_download_threads;
}
}
FileCache::QueryContextHolder::QueryContextHolder(
const String & query_id_,
FileCache * cache_,
@ -1242,13 +1267,13 @@ FileCache::QueryContextHolder::~QueryContextHolder()
}
FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder(
const String & query_id, const ReadSettings & settings)
const String & query_id, const ReadSettings & read_settings)
{
if (!query_limit || settings.filesystem_cache_max_download_size == 0)
if (!query_limit || read_settings.filesystem_cache_max_download_size == 0)
return {};
auto lock = lockCache();
auto context = query_limit->getOrSetQueryContext(query_id, settings, lock);
auto context = query_limit->getOrSetQueryContext(query_id, read_settings, lock);
return std::make_unique<QueryContextHolder>(query_id, this, std::move(context));
}

View File

@ -16,6 +16,7 @@
#include <Interpreters/Cache/Metadata.h>
#include <Interpreters/Cache/QueryLimit.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <filesystem>
@ -150,14 +151,15 @@ public:
std::vector<FileSegment::Info> sync();
void applySettingsIfPossible(const FileCacheSettings & new_settings, FileCacheSettings & actual_settings);
private:
using KeyAndOffset = FileCacheKeyAndOffset;
const size_t max_file_segment_size;
const size_t bypass_cache_threshold = 0;
const size_t bypass_cache_threshold;
const size_t boundary_alignment;
const size_t background_download_threads; /// 0 means background download is disabled.
const size_t metadata_download_threads;
size_t load_metadata_threads;
Poco::Logger * log;
@ -165,6 +167,9 @@ private:
std::atomic<bool> is_initialized = false;
mutable std::mutex init_mutex;
std::unique_ptr<StatusFile> status_file;
std::atomic<bool> shutdown = false;
std::mutex apply_settings_mutex;
CacheMetadata metadata;
@ -195,12 +200,6 @@ private:
* then allowed loaded cache size is std::min(n - k, max_query_cache_size).
*/
FileCacheQueryLimitPtr query_limit;
/**
* A background cleanup task.
* Clears removed cache entries from metadata.
*/
std::vector<ThreadFromGlobalPool> download_threads;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
void assertInitialized() const;
void assertCacheCorrectness();

View File

@ -9,6 +9,22 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
FileCacheFactory::FileCacheData::FileCacheData(
FileCachePtr cache_,
const FileCacheSettings & settings_,
const std::string & config_path_)
: cache(cache_)
, config_path(config_path_)
, settings(settings_)
{
}
FileCacheSettings FileCacheFactory::FileCacheData::getSettings() const
{
std::lock_guard lock(settings_mutex);
return settings;
}
FileCacheFactory & FileCacheFactory::instance()
{
static FileCacheFactory ret;
@ -22,7 +38,9 @@ FileCacheFactory::CacheByName FileCacheFactory::getAll()
}
FileCachePtr FileCacheFactory::getOrCreate(
const std::string & cache_name, const FileCacheSettings & file_cache_settings)
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path)
{
std::lock_guard lock(mutex);
@ -31,13 +49,16 @@ FileCachePtr FileCacheFactory::getOrCreate(
{
auto cache = std::make_shared<FileCache>(cache_name, file_cache_settings);
it = caches_by_name.emplace(
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings)).first;
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings, config_path)).first;
}
return it->second->cache;
}
FileCachePtr FileCacheFactory::create(const std::string & cache_name, const FileCacheSettings & file_cache_settings)
FileCachePtr FileCacheFactory::create(
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path)
{
std::lock_guard lock(mutex);
@ -47,12 +68,12 @@ FileCachePtr FileCacheFactory::create(const std::string & cache_name, const File
auto cache = std::make_shared<FileCache>(cache_name, file_cache_settings);
it = caches_by_name.emplace(
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings)).first;
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings, config_path)).first;
return it->second->cache;
}
FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string & cache_name)
FileCacheFactory::FileCacheDataPtr FileCacheFactory::getByName(const std::string & cache_name)
{
std::lock_guard lock(mutex);
@ -60,7 +81,41 @@ FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string &
if (it == caches_by_name.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no cache by name: {}", cache_name);
return *it->second;
return it->second;
}
void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config)
{
CacheByName caches_by_name_copy;
{
std::lock_guard lock(mutex);
caches_by_name_copy = caches_by_name;
}
for (const auto & [_, cache_info] : caches_by_name_copy)
{
if (cache_info->config_path.empty())
continue;
FileCacheSettings new_settings;
new_settings.loadFromConfig(config, cache_info->config_path);
FileCacheSettings old_settings;
{
std::lock_guard lock(cache_info->settings_mutex);
if (new_settings == cache_info->settings)
continue;
old_settings = cache_info->settings;
}
cache_info->cache->applySettingsIfPossible(new_settings, old_settings);
{
std::lock_guard lock(cache_info->settings_mutex);
cache_info->settings = old_settings;
}
}
}
}

View File

@ -6,7 +6,6 @@
#include <boost/noncopyable.hpp>
#include <unordered_map>
#include <mutex>
#include <list>
namespace DB
{
@ -17,26 +16,42 @@ namespace DB
class FileCacheFactory final : private boost::noncopyable
{
public:
struct FileCacheData
class FileCacheData
{
FileCachePtr cache;
FileCacheSettings settings;
friend class FileCacheFactory;
public:
FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_, const std::string & config_path_);
FileCacheData() = default;
FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_) : cache(cache_), settings(settings_) {}
FileCacheSettings getSettings() const;
const FileCachePtr cache;
const std::string config_path;
private:
FileCacheSettings settings;
mutable std::mutex settings_mutex;
};
using FileCacheDataPtr = std::shared_ptr<FileCacheData>;
using CacheByName = std::unordered_map<std::string, FileCacheDataPtr>;
static FileCacheFactory & instance();
FileCachePtr getOrCreate(const std::string & cache_name, const FileCacheSettings & file_cache_settings);
FileCachePtr getOrCreate(
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path);
FileCachePtr create(const std::string & cache_name, const FileCacheSettings & file_cache_settings);
FileCachePtr create(
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path);
CacheByName getAll();
FileCacheData getByName(const std::string & cache_name);
FileCacheDataPtr getByName(const std::string & cache_name);
void updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config);
private:
std::mutex mutex;

View File

@ -38,6 +38,8 @@ struct FileCacheSettings
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
void loadFromCollection(const NamedCollection & collection);
bool operator ==(const FileCacheSettings &) const = default;
private:
using FuncHas = std::function<bool(std::string_view)>;
using FuncGetUInt = std::function<size_t(std::string_view)>;

View File

@ -134,11 +134,12 @@ std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment) co
/ CacheMetadata::getFileNameForFileSegment(file_segment.offset(), file_segment.getKind());
}
CacheMetadata::CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_)
CacheMetadata::CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_, size_t background_download_threads_)
: path(path_)
, cleanup_queue(std::make_shared<CleanupQueue>())
, download_queue(std::make_shared<DownloadQueue>(background_download_queue_size_limit_))
, log(&Poco::Logger::get("CacheMetadata"))
, download_threads_num(background_download_threads_)
{
}
@ -458,11 +459,6 @@ void CacheMetadata::cleanupThreadFunc()
}
}
void CacheMetadata::cancelCleanup()
{
cleanup_queue->cancel();
}
class DownloadQueue
{
friend struct CacheMetadata;
@ -473,7 +469,7 @@ public:
{
{
std::lock_guard lock(mutex);
if (cancelled || (queue_size_limit && queue.size() == queue_size_limit))
if (cancelled || (queue_size_limit && queue.size() >= queue_size_limit))
return false;
queue.push(DownloadInfo{file_segment->key(), file_segment->offset(), file_segment});
}
@ -483,6 +479,8 @@ public:
return true;
}
bool setQueueLimit(size_t size) { return queue_size_limit.exchange(size) != size; }
private:
void cancel()
{
@ -493,8 +491,8 @@ private:
cv.notify_all();
}
const size_t queue_size_limit;
std::mutex mutex;
std::atomic<size_t> queue_size_limit;
mutable std::mutex mutex;
std::condition_variable cv;
bool cancelled = false;
@ -515,7 +513,7 @@ private:
std::queue<DownloadInfo> queue;
};
void CacheMetadata::downloadThreadFunc()
void CacheMetadata::downloadThreadFunc(const bool & stop_flag)
{
std::optional<Memory<>> memory;
while (true)
@ -526,13 +524,13 @@ void CacheMetadata::downloadThreadFunc()
{
std::unique_lock lock(download_queue->mutex);
if (download_queue->cancelled)
if (download_queue->cancelled || stop_flag)
return;
if (download_queue->queue.empty())
{
download_queue->cv.wait(lock, [&](){ return download_queue->cancelled || !download_queue->queue.empty(); });
if (download_queue->cancelled)
download_queue->cv.wait(lock, [&](){ return download_queue->cancelled || !download_queue->queue.empty() || stop_flag; });
if (download_queue->cancelled || stop_flag)
return;
}
@ -607,6 +605,11 @@ void CacheMetadata::downloadThreadFunc()
}
}
bool CacheMetadata::setBackgroundDownloadQueueSizeLimit(size_t size)
{
return download_queue->setQueueLimit(size);
}
void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memory<>> & memory)
{
LOG_TEST(
@ -670,9 +673,86 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
LOG_TEST(log, "Downloaded file segment: {}", file_segment.getInfoForLog());
}
void CacheMetadata::cancelDownload()
void CacheMetadata::startup()
{
download_threads.reserve(download_threads_num);
for (size_t i = 0; i < download_threads_num; ++i)
{
download_threads.emplace_back(std::make_shared<DownloadThread>());
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>([this, thread = download_threads.back()] { downloadThreadFunc(thread->stop_flag); });
}
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ cleanupThreadFunc(); }});
}
void CacheMetadata::shutdown()
{
download_queue->cancel();
cleanup_queue->cancel();
for (auto & download_thread : download_threads)
{
if (download_thread->thread && download_thread->thread->joinable())
download_thread->thread->join();
}
if (cleanup_thread && cleanup_thread->joinable())
cleanup_thread->join();
}
bool CacheMetadata::isBackgroundDownloadEnabled()
{
return download_threads_num;
}
bool CacheMetadata::setBackgroundDownloadThreads(size_t threads_num)
{
if (threads_num == download_threads_num)
return false;
if (threads_num > download_threads_num)
{
SCOPE_EXIT({ download_threads_num = download_threads.size(); });
size_t add_threads = threads_num - download_threads_num;
for (size_t i = 0; i < add_threads; ++i)
{
download_threads.emplace_back(std::make_shared<DownloadThread>());
try
{
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>(
[this, thread = download_threads.back()] { downloadThreadFunc(thread->stop_flag); });
}
catch (...)
{
download_threads.pop_back();
throw;
}
}
}
else if (threads_num < download_threads_num)
{
size_t remove_threads = download_threads_num - threads_num;
{
std::lock_guard lock(download_queue->mutex);
for (size_t i = 0; i < remove_threads; ++i)
download_threads[download_threads.size() - 1 - i]->stop_flag = true;
}
download_queue->cv.notify_all();
SCOPE_EXIT({ download_threads_num = download_threads.size(); });
for (size_t i = 0; i < remove_threads; ++i)
{
chassert(download_threads.back()->stop_flag);
auto & thread = download_threads.back()->thread;
if (thread && thread->joinable())
thread->join();
download_threads.pop_back();
}
}
return true;
}
LockedKey::LockedKey(std::shared_ptr<KeyMetadata> key_metadata_)

View File

@ -5,6 +5,7 @@
#include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <Common/ThreadPool.h>
#include <shared_mutex>
namespace DB
@ -102,7 +103,9 @@ public:
using Key = FileCacheKey;
using IterateFunc = std::function<void(LockedKey &)>;
explicit CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_);
explicit CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_, size_t background_download_threads_);
void startup();
const String & getBaseDirectory() const { return path; }
@ -138,21 +141,13 @@ public:
void removeKey(const Key & key, bool if_exists, bool if_releasable);
void removeAllKeys(bool if_releasable);
void cancelCleanup();
void shutdown();
/// Firstly, this cleanup does not delete cache files,
/// but only empty keys from cache_metadata_map and key (prefix) directories from fs.
/// Secondly, it deletes those only if arose as a result of
/// (1) eviction in FileCache::tryReserve();
/// (2) removal of cancelled non-downloaded file segments after FileSegment::complete().
/// which does not include removal of cache files because of FileCache::removeKey/removeAllKeys,
/// triggered by removal of source files from objects storage.
/// E.g. number of elements submitted to background cleanup should remain low.
void cleanupThreadFunc();
bool setBackgroundDownloadThreads(size_t threads_num);
size_t getBackgroundDownloadThreads() const { return download_threads.size(); }
bool setBackgroundDownloadQueueSizeLimit(size_t size);
void downloadThreadFunc();
void cancelDownload();
bool isBackgroundDownloadEnabled();
private:
const std::string path; /// Cache base path
@ -172,6 +167,16 @@ private:
static constexpr size_t buckets_num = 1024;
std::vector<MetadataBucket> metadata_buckets{buckets_num};
struct DownloadThread
{
std::unique_ptr<ThreadFromGlobalPool> thread;
bool stop_flag{false};
};
std::vector<std::shared_ptr<DownloadThread>> download_threads;
std::atomic<size_t> download_threads_num;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
MetadataBucket & getMetadataBucket(const Key & key);
void downloadImpl(FileSegment & file_segment, std::optional<Memory<>> & memory);
MetadataBucket::iterator removeEmptyKey(
@ -179,6 +184,18 @@ private:
MetadataBucket::iterator it,
LockedKey &,
const CacheMetadataGuard::Lock &);
void downloadThreadFunc(const bool & stop_flag);
/// Firstly, this cleanup does not delete cache files,
/// but only empty keys from cache_metadata_map and key (prefix) directories from fs.
/// Secondly, it deletes those only if arose as a result of
/// (1) eviction in FileCache::tryReserve();
/// (2) removal of cancelled non-downloaded file segments after FileSegment::complete().
/// which does not include removal of cache files because of FileCache::removeKey/removeAllKeys,
/// triggered by removal of source files from objects storage.
/// E.g. number of elements submitted to background cleanup should remain low.
void cleanupThreadFunc();
};

View File

@ -213,8 +213,6 @@ struct ContextSharedPart : boost::noncopyable
mutable zkutil::ZooKeeperPtr zookeeper TSA_GUARDED_BY(zookeeper_mutex); /// Client for ZooKeeper.
ConfigurationPtr zookeeper_config TSA_GUARDED_BY(zookeeper_mutex); /// Stores zookeeper configs
ConfigurationPtr sensitive_data_masker_config;
#if USE_NURAFT
mutable std::mutex keeper_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher TSA_GUARDED_BY(keeper_dispatcher_mutex);
@ -1094,7 +1092,7 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t
if (shared->root_temp_data_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");
auto file_cache = FileCacheFactory::instance().getByName(disk_ptr->getCacheName()).cache;
auto file_cache = FileCacheFactory::instance().getByName(disk_ptr->getCacheName())->cache;
if (!file_cache)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Cache '{}' is not found", disk_ptr->getCacheName());
@ -3335,16 +3333,6 @@ bool Context::hasAuxiliaryZooKeeper(const String & name) const
return getConfigRef().has("auxiliary_zookeepers." + name);
}
void Context::reloadQueryMaskingRulesIfChanged(const ConfigurationPtr & config) const
{
const auto old_config = shared->sensitive_data_masker_config;
if (old_config && isSameConfiguration(*config, *old_config, "query_masking_rules"))
return;
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(*config, "query_masking_rules"));
shared->sensitive_data_masker_config = config;
}
InterserverCredentialsPtr Context::getInterserverCredentials() const
{
return shared->interserver_io_credentials.get();

View File

@ -955,8 +955,6 @@ public:
// Reload Zookeeper
void reloadZooKeeperIfChanged(const ConfigurationPtr & config) const;
void reloadQueryMaskingRulesIfChanged(const ConfigurationPtr & config) const;
void setSystemZooKeeperLogAfterInitializationIfNeeded();
/// --- Caches ------------------------------------------------------------------------------------------

View File

@ -30,29 +30,6 @@ namespace fs = std::filesystem;
namespace DB
{
class TableNameHints : public IHints<>
{
public:
TableNameHints(ConstDatabasePtr database_, ContextPtr context_)
: context(context_),
database(database_)
{
}
Names getAllRegisteredNames() const override
{
Names result;
if (database)
{
for (auto table_it = database->getTablesIterator(context); table_it->isValid(); table_it->next())
result.emplace_back(table_it->name());
}
return result;
}
private:
ContextPtr context;
ConstDatabasePtr database;
};
class IDatabase;
class Exception;
class ColumnsDescription;
@ -392,6 +369,68 @@ private:
static constexpr time_t DBMS_DEFAULT_DISK_RELOAD_PERIOD_SEC = 5;
};
class TableNameHints : public IHints<>
{
public:
TableNameHints(ConstDatabasePtr database_, ContextPtr context_)
: context(context_),
database(database_)
{
}
/// getHintForTable tries to get a hint for the provided table_name in the provided
/// database. If the results are empty, it goes for extended hints for the table
/// with getExtendedHintForTable which looks for the table name in every database that's
/// available in the database catalog. It finally returns a single hint which is the database
/// name and table_name pair which is similar to the table_name provided. Perhaps something to
/// consider is should we return more than one pair of hint?
std::pair<String, String> getHintForTable(const String & table_name) const
{
auto results = this->getHints(table_name, getAllRegisteredNames());
if (results.empty())
return getExtendedHintForTable(table_name);
return std::make_pair(database->getDatabaseName(), results[0]);
}
/// getExtendedHintsForTable tries to get hint for the given table_name across all
/// the databases that are available in the database catalog.
std::pair<String, String> getExtendedHintForTable(const String & table_name) const
{
/// load all available databases from the DatabaseCatalog instance
auto & database_catalog = DatabaseCatalog::instance();
auto all_databases = database_catalog.getDatabases();
for (const auto & [db_name, db] : all_databases)
{
/// this case should be covered already by getHintForTable
if (db_name == database->getDatabaseName())
continue;
TableNameHints hints(db, context);
auto results = hints.getHints(table_name);
/// if the results are not empty, return the first instance of the table_name
/// and the corresponding database_name that was found.
if (!results.empty())
return std::make_pair(db_name, results[0]);
}
return {};
}
Names getAllRegisteredNames() const override
{
Names result;
if (database)
for (auto table_it = database->getTablesIterator(context); table_it->isValid(); table_it->next())
result.emplace_back(table_it->name());
return result;
}
private:
ContextPtr context;
ConstDatabasePtr database;
};
/// This class is useful when creating a table or database.
/// Usually we create IStorage/IDatabase object first and then add it to IDatabase/DatabaseCatalog.

View File

@ -42,8 +42,8 @@ BlockIO InterpreterDescribeCacheQuery::execute()
MutableColumns res_columns = sample_block.cloneEmptyColumns();
auto cache_data = FileCacheFactory::instance().getByName(ast.cache_name);
const auto & settings = cache_data.settings;
const auto & cache = cache_data.cache;
auto settings = cache_data->getSettings();
const auto & cache = cache_data->cache;
size_t i = 0;
res_columns[i++]->insert(settings.max_size);

View File

@ -380,7 +380,7 @@ BlockIO InterpreterSystemQuery::execute()
}
else
{
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name)->cache;
if (query.key_to_drop.empty())
{
cache->removeAllReleasable();
@ -434,7 +434,7 @@ BlockIO InterpreterSystemQuery::execute()
}
else
{
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name)->cache;
auto file_segments = cache->sync();
fill_data(query.filesystem_cache_name, cache, file_segments);
}

View File

@ -45,7 +45,6 @@ public:
void resetFormatterImpl() override
{
LOG_DEBUG(&Poco::Logger::get("RowOutputFormatWithExceptionHandlerAdaptor"), "resetFormatterImpl");
Base::resetFormatterImpl();
if (validating_ostr)
validating_ostr = std::make_unique<WriteBufferValidUTF8>(*Base::getWriteBufferPtr());

View File

@ -75,6 +75,13 @@ namespace ErrorCodes
}
namespace
{
struct HDFSFileInfoDeleter
{
/// Can have only one entry (see hdfsGetPathInfo())
void operator()(hdfsFileInfo * info) { hdfsFreeFileInfo(info, 1); }
};
using HDFSFileInfoPtr = std::unique_ptr<hdfsFileInfo, HDFSFileInfoDeleter>;
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageFile.
*/
@ -90,13 +97,12 @@ namespace
if (first_glob_pos == std::string::npos)
{
const String path = fs::path(path_for_ls + for_match.substr(1)).lexically_normal();
HDFSFileInfo ls;
ls.file_info = hdfsGetPathInfo(fs.get(), path.c_str());
if (ls.file_info != nullptr) // NOLINT
HDFSFileInfoPtr hdfs_info(hdfsGetPathInfo(fs.get(), path.c_str()));
if (hdfs_info) // NOLINT
{
result.push_back(StorageHDFS::PathWithInfo{
String(path),
StorageHDFS::PathInfo{ls.file_info->mLastMod, static_cast<size_t>(ls.file_info->mSize)}});
StorageHDFS::PathInfo{hdfs_info->mLastMod, static_cast<size_t>(hdfs_info->mSize)}});
}
return result;
}
@ -184,13 +190,6 @@ namespace
}
return res;
}
struct HDFSFileInfoDeleter
{
/// Can have only one entry (see hdfsGetPathInfo())
void operator()(hdfsFileInfo * info) { hdfsFreeFileInfo(info, 1); }
};
using HDFSFileInfoPtr = std::unique_ptr<hdfsFileInfo, HDFSFileInfoDeleter>;
}
StorageHDFS::StorageHDFS(
@ -610,7 +609,7 @@ bool HDFSSource::initialize()
/// We should not return single chunk with all number of rows,
/// because there is a chance that this chunk will be materialized later
/// (it can cause memory problems even with default values in columns or when virtual columns are requested).
/// Instead, we use special ConstChunkGenerator that will generate chunks
/// Instead, we use a special ConstChunkGenerator that will generate chunks
/// with max_block_size rows until total number of rows is reached.
auto source = std::make_shared<ConstChunkGenerator>(block_for_format, *num_rows_from_cache, max_block_size);
builder.init(Pipe(source));
@ -637,7 +636,7 @@ bool HDFSSource::initialize()
}
/// Add ExtractColumnsTransform to extract requested columns/subcolumns
/// from chunk read by IInputFormat.
/// from the chunk read by IInputFormat.
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExtractColumnsTransform>(header, requested_columns);

View File

@ -39,6 +39,38 @@ bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String &
return outcome == AddPartOutcome::Added;
}
void ActiveDataPartSet::checkIntersectingParts(const MergeTreePartInfo & part_info) const
{
auto it = part_info_to_name.lower_bound(part_info);
/// Let's go left.
while (it != part_info_to_name.begin())
{
--it;
if (!part_info.contains(it->first))
{
if (!part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug or a result of manual intervention in the ZooKeeper data.", part_info.getPartNameForLogs(), it->first.getPartNameForLogs());
++it;
break;
}
}
/// Let's go to the right.
while (it != part_info_to_name.end() && part_info.contains(it->first))
{
assert(part_info != it->first);
++it;
}
if (it != part_info_to_name.end() && !part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug or a result of manual intervention in the ZooKeeper data.", part_info.getPartNameForLogs(), it->first.getPartNameForLogs());
}
void ActiveDataPartSet::checkIntersectingParts(const String & name) const
{
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
checkIntersectingParts(part_info);
}
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
{

View File

@ -106,11 +106,16 @@ public:
MergeTreeDataFormatVersion getFormatVersion() const { return format_version; }
void checkIntersectingParts(const MergeTreePartInfo & part_info) const;
void checkIntersectingParts(const String & name) const;
private:
AddPartOutcome addImpl(const MergeTreePartInfo & part_info, const String & name, Strings * out_replaced_parts = nullptr, String * out_reason = nullptr);
MergeTreeDataFormatVersion format_version;
std::map<MergeTreePartInfo, String> part_info_to_name;
using PartInfoToName = std::map<MergeTreePartInfo, String>;
PartInfoToName part_info_to_name;
std::vector<std::map<MergeTreePartInfo, String>::const_iterator> getPartsCoveredByImpl(const MergeTreePartInfo & part_info) const;

View File

@ -60,10 +60,11 @@ bool BackgroundJobsAssignee::scheduleMergeMutateTask(ExecutableTaskPtr merge_tas
}
void BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task)
bool BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task)
{
bool res = getContext()->getFetchesExecutor()->trySchedule(fetch_task);
res ? trigger() : postpone();
return res;
}
@ -75,10 +76,11 @@ bool BackgroundJobsAssignee::scheduleMoveTask(ExecutableTaskPtr move_task)
}
void BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger)
bool BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger)
{
bool res = getContext()->getCommonExecutor()->trySchedule(common_task) && need_trigger;
res ? trigger() : postpone();
bool schedule_res = getContext()->getCommonExecutor()->trySchedule(common_task);
schedule_res && need_trigger ? trigger() : postpone();
return schedule_res;
}

View File

@ -66,9 +66,9 @@ public:
void finish();
bool scheduleMergeMutateTask(ExecutableTaskPtr merge_task);
void scheduleFetchTask(ExecutableTaskPtr fetch_task);
bool scheduleFetchTask(ExecutableTaskPtr fetch_task);
bool scheduleMoveTask(ExecutableTaskPtr move_task);
void scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger);
bool scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger);
/// Just call finish
~BackgroundJobsAssignee();

View File

@ -184,6 +184,23 @@ void DataPartStorageOnDiskFull::createHardLinkFrom(const IDataPartStorage & sour
});
}
void DataPartStorageOnDiskFull::copyFileFrom(const IDataPartStorage & source, const std::string & from, const std::string & to)
{
const auto * source_on_disk = typeid_cast<const DataPartStorageOnDiskFull *>(&source);
if (!source_on_disk)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create copy file from different storage. Expected DataPartStorageOnDiskFull, got {}",
typeid(source).name());
/// Copying files between different disks is
/// not supported in disk transactions.
source_on_disk->getDisk()->copyFile(
fs::path(source_on_disk->getRelativePath()) / from,
*volume->getDisk(),
fs::path(root_path) / part_dir / to);
}
void DataPartStorageOnDiskFull::createProjection(const std::string & name)
{
executeWriteOperation([&](auto & disk) { disk.createDirectory(fs::path(root_path) / part_dir / name); });

View File

@ -48,6 +48,7 @@ public:
void removeFileIfExists(const String & name) override;
void createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) override;
void copyFileFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) override;
void beginTransaction() override;
void commitTransaction() override;

View File

@ -12,6 +12,7 @@
#include <optional>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Disks/IDiskTransaction.h>
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
namespace DB
{
@ -304,6 +305,7 @@ public:
virtual SyncGuardPtr getDirectorySyncGuard() const { return nullptr; }
virtual void createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) = 0;
virtual void copyFileFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) = 0;
/// Rename part.
/// Ideally, new_root_path should be the same as current root (but it is not true).

View File

@ -86,13 +86,13 @@ protected:
/// Actual serialization of columns in part.
Serializations serializations;
UncompressedCache * uncompressed_cache;
MarkCache * mark_cache;
UncompressedCache * const uncompressed_cache;
MarkCache * const mark_cache;
MergeTreeReaderSettings settings;
StorageSnapshotPtr storage_snapshot;
MarkRanges all_mark_ranges;
const StorageSnapshotPtr storage_snapshot;
const MarkRanges all_mark_ranges;
/// Position and level (of nesting).
using ColumnNameLevel = std::optional<std::pair<String, size_t>>;

View File

@ -165,7 +165,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
std::optional<MergeTreeDataPartBuilder> builder;
if (global_ctx->parent_part)
{
auto data_part_storage = global_ctx->parent_part->getDataPartStorage().getProjection(local_tmp_part_basename);
auto data_part_storage = global_ctx->parent_part->getDataPartStorage().getProjection(local_tmp_part_basename, /* use parent transaction */ false);
builder.emplace(*global_ctx->data, global_ctx->future_part->name, data_part_storage);
builder->withParentPart(global_ctx->parent_part);
}
@ -185,11 +185,11 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (data_part_storage->exists())
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Directory {} already exists", data_part_storage->getFullPath());
if (!global_ctx->parent_part)
{
data_part_storage->beginTransaction();
/// Background temp dirs cleaner will not touch tmp projection directory because
/// it's located inside part's directory
if (!global_ctx->parent_part)
global_ctx->temporary_directory_lock = global_ctx->data->getTemporaryPartDirectoryHolder(local_tmp_part_basename);
}
global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical();
global_ctx->storage_columns = global_ctx->metadata_snapshot->getColumns().getAllPhysical();

View File

@ -113,6 +113,13 @@ public:
return global_ctx->promise.get_future();
}
MergeTreeData::MutableDataPartPtr getUnfinishedPart()
{
if (global_ctx)
return global_ctx->new_data_part;
return nullptr;
}
bool execute();
private:

View File

@ -2080,6 +2080,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const String & root_path, siz
if (disk->isBroken())
continue;
if (!disk->exists(root_path))
continue;
for (auto it = disk->iterateDirectory(root_path); it->isValid(); it->next())
{
const std::string & basename = it->name();
@ -7284,6 +7287,8 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
const String & with_name,
ContextPtr local_context)
{
auto settings = getSettings();
String clickhouse_path = fs::canonical(local_context->getPath());
String default_shadow_path = fs::path(clickhouse_path) / "shadow/";
fs::create_directories(default_shadow_path);
@ -7294,6 +7299,20 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
/// Acquire a snapshot of active data parts to prevent removing while doing backup.
const auto data_parts = getVisibleDataPartsVector(local_context);
bool has_zero_copy_part = false;
for (const auto & part : data_parts)
{
if (part->isStoredOnRemoteDiskWithZeroCopySupport())
{
has_zero_copy_part = true;
break;
}
}
if (supportsReplication() && settings->disable_freeze_partition_for_zero_copy_replication
&& settings->allow_remote_fs_zero_copy_replication && has_zero_copy_part)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FREEZE PARTITION queries are disabled.");
String backup_name = (!with_name.empty() ? escapeForFileName(with_name) : toString(increment));
String backup_path = fs::path(shadow_path) / backup_name / "";

View File

@ -36,6 +36,12 @@ struct Settings;
M(Float, ratio_of_defaults_for_sparse_serialization, 0.9375f, "Minimal ratio of number of default values to number of all values in column to store it in sparse serializations. If >= 1, columns will be always written in full serialization.", 0) \
M(Bool, replace_long_file_name_to_hash, false, "If the file name for column is too long (more than 'max_file_name_length' bytes) replace it to SipHash128", 0) \
M(UInt64, max_file_name_length, 127, "The maximal length of the file name to keep it as is without hashing", 0) \
M(UInt64, min_bytes_for_full_part_storage, 0, "Only available in ClickHouse Cloud", 0) \
M(UInt64, min_rows_for_full_part_storage, 0, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_max_bytes_to_buffer, 128 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_max_granules_to_buffer, 128, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_merge_max_bytes_to_prefetch_part, 16 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
\
/** Merge settings. */ \
M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \
M(UInt64, merge_max_block_size_bytes, 10 * 1024 * 1024, "How many bytes in blocks should be formed for merge operations. By default has the same value as `index_granularity_bytes`.", 0) \
@ -124,6 +130,9 @@ struct Settings;
M(Milliseconds, wait_for_unique_parts_send_before_shutdown_ms, 0, "Before shutdown table will wait for required amount time for unique parts (exist only on current replica) to be fetched by other replicas (0 means disabled).", 0) \
M(Float, fault_probability_before_part_commit, 0, "For testing. Do not change it.", 0) \
M(Float, fault_probability_after_part_commit, 0, "For testing. Do not change it.", 0) \
M(Bool, shared_merge_tree_disable_merges_and_mutations_assignment, false, "Only available in ClickHouse Cloud", 0) \
M(Float, shared_merge_tree_partitions_hint_ratio_to_reload_merge_pred_for_mutations, 0.5, "Only available in ClickHouse Cloud", 0) \
M(UInt64, shared_merge_tree_parts_load_batch_size, 32, "Only available in ClickHouse Cloud", 0) \
\
/** Check delay of replicas settings. */ \
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \
@ -131,6 +140,10 @@ struct Settings;
M(UInt64, max_cleanup_delay_period, 300, "Maximum period to clean old queue logs, blocks hashes and parts.", 0) \
M(UInt64, cleanup_delay_period_random_add, 10, "Add uniformly distributed value from 0 to x seconds to cleanup_delay_period to avoid thundering herd effect and subsequent DoS of ZooKeeper in case of very large number of tables.", 0) \
M(UInt64, cleanup_thread_preferred_points_per_iteration, 150, "Preferred batch size for background cleanup (points are abstract but 1 point is approximately equivalent to 1 inserted block).", 0) \
M(UInt64, cleanup_threads, 128, "Only available in ClickHouse Cloud", 0) \
M(UInt64, kill_delay_period, 30, "Only available in ClickHouse Cloud", 0) \
M(UInt64, kill_delay_period_random_add, 10, "Only available in ClickHouse Cloud", 0) \
M(UInt64, kill_threads, 128, "Only available in ClickHouse Cloud", 0) \
M(UInt64, min_relative_delay_to_close, 300, "Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, min_absolute_delay_to_close, 0, "Minimal absolute delay to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, enable_vertical_merge_algorithm, 1, "Enable usage of Vertical merge algorithm.", 0) \
@ -170,6 +183,10 @@ struct Settings;
M(UInt64, zero_copy_merge_mutation_min_parts_size_sleep_before_lock, 1ULL * 1024 * 1024 * 1024, "If zero copy replication is enabled sleep random amount of time before trying to lock depending on parts size for merge or mutation", 0) \
M(Bool, allow_floating_point_partition_key, false, "Allow floating point as partition key", 0) \
M(UInt64, sleep_before_loading_outdated_parts_ms, 0, "For testing. Do not change it.", 0) \
M(Bool, always_use_copy_instead_of_hardlinks, false, "Always copy data instead of hardlinking during mutations/replaces/detaches and so on.", 0) \
M(Bool, disable_freeze_partition_for_zero_copy_replication, true, "Disable FREEZE PARTITION query for zero copy replication.", 0) \
M(Bool, disable_detach_partition_for_zero_copy_replication, true, "Disable DETACH PARTITION query for zero copy replication.", 0) \
M(Bool, disable_fetch_partition_for_zero_copy_replication, true, "Disable FETCH PARTITION query for zero copy replication.", 0) \
\
/** Experimental/work in progress feature. Unsafe for production. */ \
M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \
@ -177,6 +194,7 @@ struct Settings;
M(Bool, allow_remote_fs_zero_copy_replication, false, "Don't use this setting in production, because it is not ready.", 0) \
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for zero-copy table-independent info.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
M(Bool, cache_populated_by_fetch, false, "Only available in ClickHouse Cloud", 0) \
M(Bool, allow_experimental_block_number_column, false, "Enable persisting column _block_number for each row.", 0) \
\
/** Compress marks and primary key. */ \

View File

@ -1357,6 +1357,7 @@ private:
NameSet removed_stats;
/// A stat file need to be renamed iff the column is renamed.
NameToNameMap renamed_stats;
for (const auto & command : ctx->for_file_renames)
{
if (command.type == MutationCommand::DROP_INDEX)
@ -1652,6 +1653,8 @@ private:
ctx->new_data_part->version.setCreationTID(tid, nullptr);
ctx->new_data_part->storeVersionMetadata();
auto settings = ctx->source_part->storage.getSettings();
NameSet hardlinked_files;
/// NOTE: Renames must be done in order
@ -1690,10 +1693,19 @@ private:
String destination = it->name();
if (it->isFile())
{
if (settings->always_use_copy_instead_of_hardlinks)
{
ctx->new_data_part->getDataPartStorage().copyFileFrom(
ctx->source_part->getDataPartStorage(), it->name(), destination);
}
else
{
ctx->new_data_part->getDataPartStorage().createHardLinkFrom(
ctx->source_part->getDataPartStorage(), file_name, destination);
hardlinked_files.insert(file_name);
ctx->source_part->getDataPartStorage(), it->name(), destination);
hardlinked_files.insert(it->name());
}
}
else if (!endsWith(it->name(), ".tmp_proj")) // ignore projection tmp merge dir
{
@ -1704,8 +1716,16 @@ private:
auto projection_data_part_storage_dst = ctx->new_data_part->getDataPartStorage().getProjection(destination);
for (auto p_it = projection_data_part_storage_src->iterate(); p_it->isValid(); p_it->next())
{
if (settings->always_use_copy_instead_of_hardlinks)
{
projection_data_part_storage_dst->copyFileFrom(
*projection_data_part_storage_src, p_it->name(), p_it->name());
}
else
{
auto file_name_with_projection_prefix = fs::path(projection_data_part_storage_src->getPartDirectory()) / p_it->name();
projection_data_part_storage_dst->createHardLinkFrom(
*projection_data_part_storage_src, p_it->name(), p_it->name());
@ -1713,6 +1733,7 @@ private:
}
}
}
}
/// Tracking of hardlinked files required for zero-copy replication.
/// We don't remove them when we delete last copy of source part because
@ -1973,19 +1994,20 @@ bool MutateTask::prepare()
IDataPartStorage::ClonePartParams clone_params
{
.txn = ctx->txn, .hardlinked_files = &ctx->hardlinked_files,
.files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks), .keep_metadata_version = true
.copy_instead_of_hardlink = settings_ptr->always_use_copy_instead_of_hardlinks,
.files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks),
.keep_metadata_version = true,
};
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(
ctx->source_part,
prefix,
ctx->future_part->part_info,
ctx->metadata_snapshot,
clone_params,
ctx->context->getReadSettings(),
ctx->context->getWriteSettings());
part->getDataPartStorage().beginTransaction();
MergeTreeData::MutableDataPartPtr part;
scope_guard lock;
{
std::tie(part, lock) = ctx->data->cloneAndLoadDataPartOnSameDisk(
ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, clone_params, ctx->context->getReadSettings(), ctx->context->getWriteSettings());
part->getDataPartStorage().beginTransaction();
ctx->temporary_directory_lock = std::move(lock);
}
promise.set_value(std::move(part));
return false;
}

View File

@ -335,7 +335,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
&Poco::Logger::get("checkDataPart"),
"Will drop cache for data part {} and will check it once again", data_part->name);
auto & cache = *FileCacheFactory::instance().getByName(*cache_name).cache;
auto & cache = *FileCacheFactory::instance().getByName(*cache_name)->cache;
for (auto it = data_part_storage.iterate(); it->isValid(); it->next())
{
auto file_name = it->name();

View File

@ -138,15 +138,35 @@ MaterializedPostgreSQLConsumer::StorageData::Buffer::Buffer(
columns_ast.children.emplace_back(std::make_shared<ASTIdentifier>(name));
}
MaterializedPostgreSQLConsumer::StorageData::Buffer & MaterializedPostgreSQLConsumer::StorageData::getBuffer()
MaterializedPostgreSQLConsumer::StorageData::Buffer & MaterializedPostgreSQLConsumer::StorageData::getLastBuffer()
{
if (!buffer)
if (buffers.empty())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Data buffer not initialized for {}",
throw Exception(ErrorCodes::LOGICAL_ERROR, "No data buffer for {}",
storage->getStorageID().getNameForLogs());
}
return *buffer;
return *buffers.back();
}
MaterializedPostgreSQLConsumer::StorageData::BufferPtr MaterializedPostgreSQLConsumer::StorageData::popBuffer()
{
if (buffers.empty())
return nullptr;
auto buffer = std::move(buffers.front());
buffers.pop_front();
return buffer;
}
void MaterializedPostgreSQLConsumer::StorageData::addBuffer(BufferPtr buffer)
{
buffers.push_back(std::move(buffer));
}
void MaterializedPostgreSQLConsumer::StorageData::returnBuffer(BufferPtr buffer)
{
buffers.push_front(std::move(buffer));
}
void MaterializedPostgreSQLConsumer::StorageData::Buffer::assertInsertIsPossible(size_t col_idx) const
@ -163,7 +183,7 @@ void MaterializedPostgreSQLConsumer::StorageData::Buffer::assertInsertIsPossible
void MaterializedPostgreSQLConsumer::insertValue(StorageData & storage_data, const std::string & value, size_t column_idx)
{
auto & buffer = storage_data.getBuffer();
auto & buffer = storage_data.getLastBuffer();
buffer.assertInsertIsPossible(column_idx);
const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx);
@ -203,7 +223,7 @@ void MaterializedPostgreSQLConsumer::insertValue(StorageData & storage_data, con
void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData & storage_data, size_t column_idx)
{
auto & buffer = storage_data.getBuffer();
auto & buffer = storage_data.getLastBuffer();
buffer.assertInsertIsPossible(column_idx);
const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx);
@ -346,7 +366,7 @@ void MaterializedPostgreSQLConsumer::readTupleData(
}
}
auto & columns = storage_data.getBuffer().columns;
auto & columns = storage_data.getLastBuffer().columns;
switch (type)
{
case PostgreSQLQuery::INSERT:
@ -637,7 +657,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
}
}
storage_data.setBuffer(std::make_unique<StorageData::Buffer>(std::move(columns), description));
storage_data.addBuffer(std::make_unique<StorageData::Buffer>(std::move(columns), description));
tables_to_sync.insert(table_name);
break;
}
@ -660,9 +680,10 @@ void MaterializedPostgreSQLConsumer::syncTables()
{
auto table_name = *tables_to_sync.begin();
auto & storage_data = storages.find(table_name)->second;
auto & buffer = storage_data.getBuffer();
Block result_rows = buffer.sample_block.cloneWithColumns(std::move(buffer.columns));
while (auto buffer = storage_data.popBuffer())
{
Block result_rows = buffer->sample_block.cloneWithColumns(std::move(buffer->columns));
try
{
if (result_rows.rows())
@ -674,7 +695,7 @@ void MaterializedPostgreSQLConsumer::syncTables()
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
insert->columns = std::make_shared<ASTExpressionList>(buffer.columns_ast);
insert->columns = std::make_shared<ASTExpressionList>(buffer->columns_ast);
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto io = interpreter.execute();
@ -692,11 +713,12 @@ void MaterializedPostgreSQLConsumer::syncTables()
catch (...)
{
/// Retry this buffer later.
buffer.columns = result_rows.mutateColumns();
buffer->columns = result_rows.mutateColumns();
storage_data.returnBuffer(std::move(buffer));
throw;
}
}
storage_data.setBuffer(nullptr);
tables_to_sync.erase(tables_to_sync.begin());
}

View File

@ -46,7 +46,7 @@ private:
const Names column_names;
const ArrayInfo array_info;
struct Buffer
struct Buffer : private boost::noncopyable
{
Block sample_block;
MutableColumns columns;
@ -56,13 +56,18 @@ private:
void assertInsertIsPossible(size_t col_idx) const;
};
using BufferPtr = std::unique_ptr<Buffer>;
Buffer & getBuffer();
Buffer & getLastBuffer();
void setBuffer(std::unique_ptr<Buffer> buffer_) { buffer = std::move(buffer_); }
BufferPtr popBuffer();
void addBuffer(BufferPtr buffer);
void returnBuffer(BufferPtr buffer);
private:
std::unique_ptr<Buffer> buffer;
std::deque<BufferPtr> buffers;
};
using Storages = std::unordered_map<String, StorageData>;

View File

@ -8,6 +8,7 @@
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
#include "Common/Exception.h"
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
#include <Common/ProfileEventsScope.h>
@ -2143,7 +2144,12 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
IDataPartStorage::ClonePartParams clone_params
{
.txn = local_context->getCurrentTransaction(),
.copy_instead_of_hardlink = getSettings()->always_use_copy_instead_of_hardlinks,
};
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
@ -2153,6 +2159,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
local_context->getReadSettings(),
local_context->getWriteSettings()
);
dst_parts.emplace_back(std::move(dst_part));
dst_parts_locks.emplace_back(std::move(part_lock));
}

View File

@ -190,6 +190,7 @@ namespace ErrorCodes
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int TABLE_IS_DROPPED;
extern const int CANNOT_BACKUP_TABLE;
extern const int SUPPORT_IS_DISABLED;
}
namespace ActionLocks
@ -2688,7 +2689,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = (our_zero_copy_enabled || source_zero_copy_enabled) && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport(),
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || ((our_zero_copy_enabled || source_zero_copy_enabled) && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
@ -4777,7 +4778,12 @@ bool StorageReplicatedMergeTree::fetchPart(
get_part = [&, part_to_clone]()
{
chassert(!is_zero_copy_part(part_to_clone));
IDataPartStorage::ClonePartParams clone_params{ .keep_metadata_version = true };
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = getSettings()->always_use_copy_instead_of_hardlinks,
.keep_metadata_version = true,
};
auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(
part_to_clone,
"tmp_clone_",
@ -4786,6 +4792,7 @@ bool StorageReplicatedMergeTree::fetchPart(
clone_params,
getContext()->getReadSettings(),
getContext()->getWriteSettings());
part_directory_lock = std::move(lock);
return cloned_part;
};
@ -6363,6 +6370,18 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de
if (!is_leader)
throw Exception(ErrorCodes::NOT_A_LEADER, "DROP PARTITION cannot be done on this replica because it is not a leader");
auto settings = getSettings();
if (detach && settings->disable_detach_partition_for_zero_copy_replication
&& settings->allow_remote_fs_zero_copy_replication)
{
for (const auto & disk : getDisks())
{
if (disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "DETACH PARTITION queries are disabled.");
}
}
zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly();
const auto * partition_ast = partition->as<ASTPartition>();
@ -7022,6 +7041,8 @@ void StorageReplicatedMergeTree::fetchPartition(
bool fetch_part,
ContextPtr query_context)
{
auto settings = getSettings();
Macros::MacroExpansionInfo info;
info.expand_special_macros_only = false;
info.table_id = getStorageID();
@ -7032,6 +7053,16 @@ void StorageReplicatedMergeTree::fetchPartition(
if (from.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "ZooKeeper path should not be empty");
if (settings->disable_fetch_partition_for_zero_copy_replication
&& settings->allow_remote_fs_zero_copy_replication)
{
for (const auto & disk : getDisks())
{
if (disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FETCH PARTITION queries are disabled.");
}
}
zkutil::ZooKeeperPtr zookeeper;
if (from_zookeeper_name != default_zookeeper_name)
zookeeper = getContext()->getAuxiliaryZooKeeper(from_zookeeper_name);
@ -7884,7 +7915,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
@ -8132,7 +8163,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = dest_metadata_snapshot->getMetadataVersion()
};
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(

View File

@ -78,7 +78,7 @@ Pipe StorageSystemDisks::read(
String cache_path;
if (disk_ptr->supportsCache())
cache_path = FileCacheFactory::instance().getByName(disk_ptr->getCacheName()).settings.base_path;
cache_path = FileCacheFactory::instance().getByName(disk_ptr->getCacheName())->getSettings().base_path;
col_cache_path->insert(cache_path);
}

View File

@ -63,7 +63,7 @@ Pipe StorageSystemRemoteDataPaths::read(
FileCachePtr cache;
if (disk->supportsCache())
cache = FileCacheFactory::instance().getByName(disk->getCacheName()).cache;
cache = FileCacheFactory::instance().getByName(disk->getCacheName())->cache;
for (const auto & [local_path, storage_objects] : remote_paths_by_local_path)
{

View File

@ -1,4 +1,4 @@
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
#ifdef OS_LINUX /// Because of 'rt_tgsigqueueinfo' functions and RT signals.
#include <csignal>
#include <poll.h>
@ -28,6 +28,12 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
#include <base/getThreadId.h>
#include <sys/syscall.h>
int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *info)
{
return static_cast<int>(syscall(__NR_rt_tgsigqueueinfo, tgid, tid, sig, info));
}
namespace DB
@ -48,7 +54,7 @@ namespace
{
// Initialized in StorageSystemStackTrace's ctor and used in signalHandler.
std::atomic<pid_t> expected_pid;
std::atomic<pid_t> server_pid;
const int sig = SIGRTMIN;
std::atomic<int> sequence_num = 0; /// For messages sent via pipe.
@ -80,7 +86,7 @@ void signalHandler(int, siginfo_t * info, void * context)
/// In case malicious user is sending signals manually (for unknown reason).
/// If we don't check - it may break our synchronization.
if (info->si_pid != expected_pid)
if (info->si_pid != server_pid)
return;
/// Signal received too late.
@ -287,20 +293,22 @@ protected:
Stopwatch watch;
SCOPE_EXIT({ signals_sent_ms += watch.elapsedMilliseconds(); });
sigval sig_value{};
siginfo_t sig_info{};
sig_info.si_code = SI_QUEUE; /// sigqueue()
sig_info.si_pid = server_pid;
sig_info.si_value.sival_int = sequence_num.load(std::memory_order_acquire);
sig_value.sival_int = sequence_num.load(std::memory_order_acquire);
if (0 != ::sigqueue(static_cast<int>(tid), sig, sig_value))
if (0 != ::rt_tgsigqueueinfo(server_pid, static_cast<pid_t>(tid), sig, &sig_info))
{
/// The thread may has been already finished.
if (ESRCH == errno)
continue;
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
throwFromErrno("Cannot queue a signal", ErrorCodes::CANNOT_SIGQUEUE);
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (wait(pipe_read_timeout_ms) && sig_value.sival_int == data_ready_num.load(std::memory_order_acquire))
if (wait(pipe_read_timeout_ms) && sig_info.si_value.sival_int == data_ready_num.load(std::memory_order_acquire))
{
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
@ -396,7 +404,7 @@ StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
notification_pipe.open();
/// Setup signal handler.
expected_pid = getpid();
server_pid = getpid();
struct sigaction sa{};
sa.sa_sigaction = signalHandler;
sa.sa_flags = SA_SIGINFO;

View File

@ -108,75 +108,72 @@
namespace DB
{
template <bool lazy>
void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database)
{
attachLazyOrNot<lazy, StorageSystemOne>(context, system_database, "one");
attachLazyOrNot<lazy, StorageSystemNumbers>(context, system_database, "numbers", false);
attachLazyOrNot<lazy, StorageSystemNumbers>(context, system_database, "numbers_mt", true);
attachLazyOrNot<lazy, StorageSystemZeros>(context, system_database, "zeros", false);
attachLazyOrNot<lazy, StorageSystemZeros>(context, system_database, "zeros_mt", true);
attachLazyOrNot<lazy, StorageSystemDatabases>(context, system_database, "databases");
attachLazyOrNot<lazy, StorageSystemTables>(context, system_database, "tables");
attachLazyOrNot<lazy, StorageSystemColumns>(context, system_database, "columns");
attachLazyOrNot<lazy, StorageSystemFunctions>(context, system_database, "functions");
attachLazyOrNot<lazy, StorageSystemEvents>(context, system_database, "events");
attachLazyOrNot<lazy, StorageSystemSettings>(context, system_database, "settings");
attachLazyOrNot<lazy, StorageSystemServerSettings>(context, system_database, "server_settings");
attachLazyOrNot<lazy, StorageSystemSettingsChanges>(context, system_database, "settings_changes");
attachLazyOrNot<lazy, SystemMergeTreeSettings<false>>(context, system_database, "merge_tree_settings");
attachLazyOrNot<lazy, SystemMergeTreeSettings<true>>(context, system_database, "replicated_merge_tree_settings");
attachLazyOrNot<lazy, StorageSystemBuildOptions>(context, system_database, "build_options");
attachLazyOrNot<lazy, StorageSystemFormats>(context, system_database, "formats");
attachLazyOrNot<lazy, StorageSystemTableFunctions>(context, system_database, "table_functions");
attachLazyOrNot<lazy, StorageSystemAggregateFunctionCombinators>(context, system_database, "aggregate_function_combinators");
attachLazyOrNot<lazy, StorageSystemDataTypeFamilies>(context, system_database, "data_type_families");
attachLazyOrNot<lazy, StorageSystemCollations>(context, system_database, "collations");
attachLazyOrNot<lazy, StorageSystemTableEngines>(context, system_database, "table_engines");
attachLazyOrNot<lazy, StorageSystemContributors>(context, system_database, "contributors");
attachLazyOrNot<lazy, StorageSystemUsers>(context, system_database, "users");
attachLazyOrNot<lazy, StorageSystemRoles>(context, system_database, "roles");
attachLazyOrNot<lazy, StorageSystemGrants>(context, system_database, "grants");
attachLazyOrNot<lazy, StorageSystemRoleGrants>(context, system_database, "role_grants");
attachLazyOrNot<lazy, StorageSystemCurrentRoles>(context, system_database, "current_roles");
attachLazyOrNot<lazy, StorageSystemEnabledRoles>(context, system_database, "enabled_roles");
attachLazyOrNot<lazy, StorageSystemSettingsProfiles>(context, system_database, "settings_profiles");
attachLazyOrNot<lazy, StorageSystemSettingsProfileElements>(context, system_database, "settings_profile_elements");
attachLazyOrNot<lazy, StorageSystemRowPolicies>(context, system_database, "row_policies");
attachLazyOrNot<lazy, StorageSystemQuotas>(context, system_database, "quotas");
attachLazyOrNot<lazy, StorageSystemQuotaLimits>(context, system_database, "quota_limits");
attachLazyOrNot<lazy, StorageSystemQuotaUsage>(context, system_database, "quota_usage");
attachLazyOrNot<lazy, StorageSystemQuotasUsage>(context, system_database, "quotas_usage");
attachLazyOrNot<lazy, StorageSystemUserDirectories>(context, system_database, "user_directories");
attachLazyOrNot<lazy, StorageSystemPrivileges>(context, system_database, "privileges");
attachLazyOrNot<lazy, StorageSystemErrors>(context, system_database, "errors");
attachLazyOrNot<lazy, StorageSystemWarnings>(context, system_database, "warnings");
attachLazyOrNot<lazy, StorageSystemDataSkippingIndices>(context, system_database, "data_skipping_indices");
attachLazyOrNot<lazy, StorageSystemLicenses>(context, system_database, "licenses");
attachLazyOrNot<lazy, StorageSystemTimeZones>(context, system_database, "time_zones");
attachLazyOrNot<lazy, StorageSystemBackups>(context, system_database, "backups");
attachLazyOrNot<lazy, StorageSystemSchemaInferenceCache>(context, system_database, "schema_inference_cache");
attachLazyOrNot<lazy, StorageSystemDroppedTables>(context, system_database, "dropped_tables");
attachLazyOrNot<lazy, StorageSystemScheduler>(context, system_database, "scheduler");
attach<StorageSystemOne>(context, system_database, "one");
attach<StorageSystemNumbers>(context, system_database, "numbers", false);
attach<StorageSystemNumbers>(context, system_database, "numbers_mt", true);
attach<StorageSystemZeros>(context, system_database, "zeros", false);
attach<StorageSystemZeros>(context, system_database, "zeros_mt", true);
attach<StorageSystemDatabases>(context, system_database, "databases");
attach<StorageSystemTables>(context, system_database, "tables");
attach<StorageSystemColumns>(context, system_database, "columns");
attach<StorageSystemFunctions>(context, system_database, "functions");
attach<StorageSystemEvents>(context, system_database, "events");
attach<StorageSystemSettings>(context, system_database, "settings");
attach<StorageSystemServerSettings>(context, system_database, "server_settings");
attach<StorageSystemSettingsChanges>(context, system_database, "settings_changes");
attach<SystemMergeTreeSettings<false>>(context, system_database, "merge_tree_settings");
attach<SystemMergeTreeSettings<true>>(context, system_database, "replicated_merge_tree_settings");
attach<StorageSystemBuildOptions>(context, system_database, "build_options");
attach<StorageSystemFormats>(context, system_database, "formats");
attach<StorageSystemTableFunctions>(context, system_database, "table_functions");
attach<StorageSystemAggregateFunctionCombinators>(context, system_database, "aggregate_function_combinators");
attach<StorageSystemDataTypeFamilies>(context, system_database, "data_type_families");
attach<StorageSystemCollations>(context, system_database, "collations");
attach<StorageSystemTableEngines>(context, system_database, "table_engines");
attach<StorageSystemContributors>(context, system_database, "contributors");
attach<StorageSystemUsers>(context, system_database, "users");
attach<StorageSystemRoles>(context, system_database, "roles");
attach<StorageSystemGrants>(context, system_database, "grants");
attach<StorageSystemRoleGrants>(context, system_database, "role_grants");
attach<StorageSystemCurrentRoles>(context, system_database, "current_roles");
attach<StorageSystemEnabledRoles>(context, system_database, "enabled_roles");
attach<StorageSystemSettingsProfiles>(context, system_database, "settings_profiles");
attach<StorageSystemSettingsProfileElements>(context, system_database, "settings_profile_elements");
attach<StorageSystemRowPolicies>(context, system_database, "row_policies");
attach<StorageSystemQuotas>(context, system_database, "quotas");
attach<StorageSystemQuotaLimits>(context, system_database, "quota_limits");
attach<StorageSystemQuotaUsage>(context, system_database, "quota_usage");
attach<StorageSystemQuotasUsage>(context, system_database, "quotas_usage");
attach<StorageSystemUserDirectories>(context, system_database, "user_directories");
attach<StorageSystemPrivileges>(context, system_database, "privileges");
attach<StorageSystemErrors>(context, system_database, "errors");
attach<StorageSystemWarnings>(context, system_database, "warnings");
attach<StorageSystemDataSkippingIndices>(context, system_database, "data_skipping_indices");
attach<StorageSystemLicenses>(context, system_database, "licenses");
attach<StorageSystemTimeZones>(context, system_database, "time_zones");
attach<StorageSystemBackups>(context, system_database, "backups");
attach<StorageSystemSchemaInferenceCache>(context, system_database, "schema_inference_cache");
attach<StorageSystemDroppedTables>(context, system_database, "dropped_tables");
attach<StorageSystemScheduler>(context, system_database, "scheduler");
#if defined(__ELF__) && !defined(OS_FREEBSD)
attachLazyOrNot<lazy, StorageSystemSymbols>(context, system_database, "symbols");
attach<StorageSystemSymbols>(context, system_database, "symbols");
#endif
#if USE_RDKAFKA
attachLazyOrNot<lazy, StorageSystemKafkaConsumers>(context, system_database, "kafka_consumers");
attach<StorageSystemKafkaConsumers>(context, system_database, "kafka_consumers");
#endif
#ifdef OS_LINUX
attachLazyOrNot<lazy, StorageSystemStackTrace>(context, system_database, "stack_trace");
attach<StorageSystemStackTrace>(context, system_database, "stack_trace");
#endif
#if USE_ROCKSDB
attachLazyOrNot<lazy, StorageSystemRocksDB>(context, system_database, "rocksdb");
attach<StorageSystemRocksDB>(context, system_database, "rocksdb");
#endif
}
template void attachSystemTablesLocal<false>(ContextPtr context, IDatabase & system_database);
template void attachSystemTablesLocal<true>(ContextPtr context, IDatabase & system_database);
void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, bool has_zookeeper)
{
attachSystemTablesLocal</* lazy= */ false>(context, system_database);
attachSystemTablesLocal(context, system_database);
attach<StorageSystemParts>(context, system_database, "parts");
attach<StorageSystemProjectionParts>(context, system_database, "projection_parts");

View File

@ -9,12 +9,8 @@ namespace DB
class AsynchronousMetrics;
class IDatabase;
template <bool lazy>
void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database);
void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, bool has_zookeeper);
void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database);
void attachSystemTablesAsync(ContextPtr context, IDatabase & system_database, AsynchronousMetrics & async_metrics);
extern template void attachSystemTablesLocal<false>(ContextPtr context, IDatabase & system_database);
extern template void attachSystemTablesLocal<true>(ContextPtr context, IDatabase & system_database);
}

View File

@ -1,42 +1,15 @@
#pragma once
#include <Databases/IDatabase.h>
#include <Databases/DatabasesCommon.h>
#include <Interpreters/DatabaseCatalog.h>
namespace DB
{
template<typename StorageT, typename... StorageArgs>
void attachLazy(ContextPtr context, IDatabase & system_database, const String & table_name, StorageArgs && ... args)
{
if (system_database.getUUID() == UUIDHelpers::Nil)
{
/// Attach to Ordinary database.
auto table_id = StorageID(DatabaseCatalog::SYSTEM_DATABASE, table_name);
system_database.registerLazyTable(context, table_name, [table_id, ... captured_args = std::forward<StorageArgs>(args)] mutable
{
return std::make_shared<StorageT>(table_id, std::forward<StorageArgs>(captured_args)...);
});
}
else
{
/// Attach to Atomic database.
/// NOTE: UUIDs are not persistent, but it's ok since no data are stored on disk for these storages
/// and path is actually not used
auto table_id = StorageID(DatabaseCatalog::SYSTEM_DATABASE, table_name, UUIDHelpers::generateV4());
DatabaseCatalog::instance().addUUIDMapping(table_id.uuid);
String path = "store/" + DatabaseCatalog::getPathForUUID(table_id.uuid);
system_database.registerLazyTable(context, table_name, [table_id, ... captured_args = std::forward<StorageArgs>(args)] mutable
{
return std::make_shared<StorageT>(table_id, std::forward<StorageArgs>(captured_args)...);
}, path);
}
}
template<typename StorageT, typename... StorageArgs>
void attach(ContextPtr context, IDatabase & system_database, const String & table_name, StorageArgs && ... args)
{
assert(system_database.getDatabaseName() == DatabaseCatalog::SYSTEM_DATABASE);
if (system_database.getUUID() == UUIDHelpers::Nil)
{
/// Attach to Ordinary database.
@ -55,15 +28,4 @@ void attach(ContextPtr context, IDatabase & system_database, const String & tabl
}
}
template<bool lazy, typename StorageT, typename... StorageArgs>
void attachLazyOrNot(ContextPtr context, IDatabase & system_database, const String & table_name, StorageArgs && ... args)
{
assert(system_database.getDatabaseName() == DatabaseCatalog::SYSTEM_DATABASE);
if constexpr (lazy)
attachLazy<StorageT>(context, system_database, table_name, std::forward<StorageArgs>(args)...);
else
attach<StorageT>(context, system_database, table_name, std::forward<StorageArgs>(args)...);
}
}

View File

@ -169,5 +169,8 @@ endif()
if (TARGET ch_contrib::libarchive)
set(USE_LIBARCHIVE 1)
endif()
if (TARGET ch_contrib::pocketfft)
set(USE_POCKETFFT 1)
endif()
set(SOURCE_DIR ${PROJECT_SOURCE_DIR})

View File

@ -35,12 +35,7 @@ from typing import List, Optional
from env_helper import TEMP_PATH
from get_robot_token import get_best_robot_token
from git_helper import git_runner, is_shallow
from github_helper import (
GitHub,
PullRequest,
PullRequests,
Repository,
)
from github_helper import GitHub, PullRequest, PullRequests, Repository
from ssh import SSHKey
@ -423,7 +418,9 @@ class Backport:
logging.info("Fetching from %s", self._fetch_from)
fetch_from_repo = self.gh.get_repo(self._fetch_from)
git_runner(
f"git fetch {fetch_from_repo.ssh_url if self.is_remote_ssh else fetch_from_repo.clone_url} {fetch_from_repo.default_branch} --no-tags"
"git fetch "
f"{fetch_from_repo.ssh_url if self.is_remote_ssh else fetch_from_repo.clone_url} "
f"{fetch_from_repo.default_branch} --no-tags"
)
logging.info("Active releases: %s", ", ".join(self.release_branches))
@ -443,7 +440,7 @@ class Backport:
logging.info("Resetting %s to %s/%s", branch, self.remote, branch)
git_runner(f"git branch -f {branch} {self.remote}/{branch}")
def receive_prs_for_backport(self):
def receive_prs_for_backport(self, reserve_search_days: int) -> None:
# The commits in the oldest open release branch
oldest_branch_commits = git_runner(
"git log --no-merges --format=%H --reverse "
@ -453,16 +450,18 @@ class Backport:
since_commit = oldest_branch_commits.split("\n", 1)[0]
since_date = date.fromisoformat(
git_runner.run(f"git log -1 --format=format:%cs {since_commit}")
)
) - timedelta(days=reserve_search_days)
# To not have a possible TZ issues
tomorrow = date.today() + timedelta(days=1)
logging.info("Receive PRs suppose to be backported")
self.prs_for_backport = self.gh.get_pulls_from_search(
query_args = dict(
query=f"type:pr repo:{self._fetch_from} -label:{self.backport_created_label}",
label=",".join(self.labels_to_backport + [self.must_create_backport_label]),
merged=[since_date, tomorrow],
)
logging.info("Query to find the backport PRs:\n %s", query_args)
self.prs_for_backport = self.gh.get_pulls_from_search(**query_args)
logging.info(
"PRs to be backported:\n %s",
"\n ".join([pr.html_url for pr in self.prs_for_backport]),
@ -584,13 +583,18 @@ def parse_args():
choices=(Labels.MUST_BACKPORT, Labels.MUST_BACKPORT_CLOUD),
help="label to filter PRs to backport",
)
parser.add_argument(
"--backport-created-label",
default=Labels.BACKPORTS_CREATED,
choices=(Labels.BACKPORTS_CREATED, Labels.BACKPORTS_CREATED_CLOUD),
help="label to mark PRs as backported",
)
parser.add_argument(
"--reserve-search-days",
default=0,
type=int,
help="safity reserve for the PRs search days, necessary for cloud",
)
parser.add_argument(
"--debug-helpers",
@ -655,7 +659,7 @@ def main():
bp.gh.cache_path = temp_path / "gh_cache"
bp.receive_release_prs()
bp.update_local_release_branches()
bp.receive_prs_for_backport()
bp.receive_prs_for_backport(args.reserve_search_days)
bp.process_backports()
if bp.error is not None:
logging.error("Finished successfully, but errors occured!")

View File

@ -19,6 +19,16 @@
<cache_policy>LRU</cache_policy>
<slru_size_ratio>0.3</slru_size_ratio>
</s3_cache>
<s3_cache_02933>
<type>cache</type>
<disk>s3_disk</disk>
<path>s3_cache_02933/</path>
<max_size>128Mi</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
<background_download_threads>0</background_download_threads>
<background_download_queue_size_limit>0</background_download_queue_size_limit>
</s3_cache_02933>
<!-- local disks -->
<local_disk>
<type>local_blob_storage</type>

View File

@ -0,0 +1,7 @@
<clickhouse>
<merge_tree>
<disable_freeze_partition_for_zero_copy_replication>0</disable_freeze_partition_for_zero_copy_replication>
<disable_detach_partition_for_zero_copy_replication>0</disable_detach_partition_for_zero_copy_replication>
<disable_fetch_partition_for_zero_copy_replication>0</disable_fetch_partition_for_zero_copy_replication>
</merge_tree>
</clickhouse>

View File

@ -63,6 +63,7 @@ ln -sf $SRC_PATH/config.d/enable_wait_for_shutdown_replicated_tables.xml $DEST_S
ln -sf $SRC_PATH/config.d/backups.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/filesystem_caches_path.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/validate_tcp_client_information.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/zero_copy_destructive_operations.xml $DEST_SERVER_PATH/config.d/
# Not supported with fasttest.
if [ "${DEST_SERVER_PATH}" = "/etc/clickhouse-server" ]

View File

@ -58,5 +58,6 @@
<allowed_disk>disk_s3</allowed_disk>
<allowed_disk>disk_s3_plain</allowed_disk>
<allowed_disk>disk_s3_cache</allowed_disk>
<allowed_disk>disk_s3_other_bucket</allowed_disk>
</backups>
</clickhouse>

View File

@ -184,6 +184,32 @@ def test_backup_to_disk(storage_policy, to_disk):
check_backup_and_restore(storage_policy, backup_destination)
@pytest.mark.parametrize(
"storage_policy, to_disk",
[
pytest.param(
"policy_s3",
"disk_s3_other_bucket",
id="from_s3_to_s3",
),
pytest.param(
"policy_s3_other_bucket",
"disk_s3",
id="from_s3_to_s3_other_bucket",
),
],
)
def test_backup_from_s3_to_s3_disk_native_copy(storage_policy, to_disk):
backup_name = new_backup_name()
backup_destination = f"Disk('{to_disk}', '{backup_name}')"
(backup_events, restore_events) = check_backup_and_restore(
storage_policy, backup_destination
)
assert backup_events["S3CopyObject"] > 0
assert restore_events["S3CopyObject"] > 0
def test_backup_to_s3():
storage_policy = "default"
backup_name = new_backup_name()

View File

@ -156,6 +156,10 @@
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<ratio_of_defaults_for_sparse_serialization>1.0</ratio_of_defaults_for_sparse_serialization>
<min_bytes_for_full_part_storage>0</min_bytes_for_full_part_storage>
<disable_freeze_partition_for_zero_copy_replication>0</disable_freeze_partition_for_zero_copy_replication>
<disable_detach_partition_for_zero_copy_replication>0</disable_detach_partition_for_zero_copy_replication>
<disable_fetch_partition_for_zero_copy_replication>0</disable_fetch_partition_for_zero_copy_replication>
</merge_tree>
<database_catalog_unused_dir_hide_timeout_sec>0</database_catalog_unused_dir_hide_timeout_sec>

View File

@ -431,7 +431,11 @@ def test_many_concurrent_queries(started_cluster):
# random update / delete query
cursor.execute(query_pool[query_id].format(random_table_name))
print("table {} query {} ok".format(random_table_name, query_id))
print(
"Executing for table {} query: {}".format(
random_table_name, query_pool[query_id]
)
)
# allow some thread to do inserts (not to violate key constraints)
if thread_id < 5:

View File

@ -1,19 +0,0 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</query_log>
<query_masking_rules>
<rule>
<regexp>TOPSECRET.TOPSECRET</regexp>
<replace>[hidden]</replace>
</rule>
</query_masking_rules>
</clickhouse>

Some files were not shown because too many files have changed in this diff Show More