Merge branch 'master' into more-s3-events

This commit is contained in:
Antonio Andelic 2023-03-23 18:23:53 +01:00 committed by GitHub
commit 3ae09c6ce8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
68 changed files with 983 additions and 539 deletions

View File

@ -353,12 +353,14 @@ if (COMPILER_CLANG)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-U,_inside_main")
# The LLVM MachO linker (ld64.lld) generates by default unwind info in 'compact' format which the internal unwinder doesn't support
# and the server will not come up ('invalid compact unwind encoding'). Disable it.
# You will see warning during the build "ld64.lld: warning: Option `-no_compact_unwind' is undocumented. Should lld implement it?".
# Yes, ld64.lld does not document the option, likely for compat with Apple's system ld after which ld64.lld is modeled after and
# which also does not document it.
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-no_compact_unwind")
# The LLVM MachO linker (ld64.lld, used in native builds) generates by default unwind info in 'compact' format which the internal
# unwinder doesn't support and the server will not come up ('invalid compact unwind encoding'). Disable it. You will see warning
# during the build "ld64.lld: warning: Option `-no_compact_unwind' is undocumented. Should lld implement it?". Yes, ld64.lld does
# not document the option, likely for compat with Apple's system ld after which ld64.lld is modeled after and which also does not
# document it.
if (NOT CMAKE_CROSSCOMPILING)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-no_compact_unwind")
endif ()
endif()
# Display absolute paths in error messages. Otherwise KDevelop fails to navigate to correct file and opens a new file instead.

View File

@ -31,6 +31,40 @@
#define BIG_CONSTANT(x) (x##LLU)
#endif // !defined(_MSC_VER)
//
//-----------------------------------------------------------------------------
// Block read - on little-endian machines this is a single load,
// while on big-endian or unknown machines the byte accesses should
// still get optimized into the most efficient instruction.
static inline uint32_t getblock ( const uint32_t * p )
{
#if defined(__BYTE_ORDER__) && (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__)
return *p;
#else
const uint8_t *c = (const uint8_t *)p;
return (uint32_t)c[0] |
(uint32_t)c[1] << 8 |
(uint32_t)c[2] << 16 |
(uint32_t)c[3] << 24;
#endif
}
static inline uint64_t getblock ( const uint64_t * p )
{
#if defined(__BYTE_ORDER__) && (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__)
return *p;
#else
const uint8_t *c = (const uint8_t *)p;
return (uint64_t)c[0] |
(uint64_t)c[1] << 8 |
(uint64_t)c[2] << 16 |
(uint64_t)c[3] << 24 |
(uint64_t)c[4] << 32 |
(uint64_t)c[5] << 40 |
(uint64_t)c[6] << 48 |
(uint64_t)c[7] << 56;
#endif
}
//-----------------------------------------------------------------------------
@ -52,7 +86,7 @@ uint32_t MurmurHash2 ( const void * key, size_t len, uint32_t seed )
while(len >= 4)
{
uint32_t k = *(uint32_t*)data;
uint32_t k = getblock((const uint32_t *)data);
k *= m;
k ^= k >> r;
@ -105,7 +139,7 @@ uint64_t MurmurHash64A ( const void * key, size_t len, uint64_t seed )
while(data != end)
{
uint64_t k = *data++;
uint64_t k = getblock(data++);
k *= m;
k ^= k >> r;
@ -151,12 +185,12 @@ uint64_t MurmurHash64B ( const void * key, size_t len, uint64_t seed )
while(len >= 8)
{
uint32_t k1 = *data++;
uint32_t k1 = getblock(data++);
k1 *= m; k1 ^= k1 >> r; k1 *= m;
h1 *= m; h1 ^= k1;
len -= 4;
uint32_t k2 = *data++;
uint32_t k2 = getblock(data++);
k2 *= m; k2 ^= k2 >> r; k2 *= m;
h2 *= m; h2 ^= k2;
len -= 4;
@ -164,7 +198,7 @@ uint64_t MurmurHash64B ( const void * key, size_t len, uint64_t seed )
if(len >= 4)
{
uint32_t k1 = *data++;
uint32_t k1 = getblock(data++);
k1 *= m; k1 ^= k1 >> r; k1 *= m;
h1 *= m; h1 ^= k1;
len -= 4;
@ -215,7 +249,7 @@ uint32_t MurmurHash2A ( const void * key, size_t len, uint32_t seed )
while(len >= 4)
{
uint32_t k = *(uint32_t*)data;
uint32_t k = getblock((const uint32_t *)data);
mmix(h,k);
@ -278,7 +312,7 @@ public:
while(len >= 4)
{
uint32_t k = *(uint32_t*)data;
uint32_t k = getblock((const uint32_t *)data);
mmix(m_hash,k);
@ -427,7 +461,7 @@ uint32_t MurmurHashAligned2 ( const void * key, size_t len, uint32_t seed )
while(len >= 4)
{
d = *(uint32_t *)data;
d = getblock((const uint32_t *)data);
t = (t >> sr) | (d << sl);
uint32_t k = t;
@ -492,7 +526,7 @@ uint32_t MurmurHashAligned2 ( const void * key, size_t len, uint32_t seed )
{
while(len >= 4)
{
uint32_t k = *(uint32_t *)data;
uint32_t k = getblock((const uint32_t *)data);
MIX(h,k,m);

View File

@ -55,14 +55,32 @@ inline uint64_t rotl64 ( uint64_t x, int8_t r )
FORCE_INLINE uint32_t getblock32 ( const uint32_t * p, int i )
{
uint32_t res;
memcpy(&res, p + i, sizeof(res));
return res;
#if defined(__BYTE_ORDER__) && (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__)
return p[i];
#else
const uint8_t *c = (const uint8_t *)&p[i];
return (uint32_t)c[0] |
(uint32_t)c[1] << 8 |
(uint32_t)c[2] << 16 |
(uint32_t)c[3] << 24;
#endif
}
FORCE_INLINE uint64_t getblock64 ( const uint64_t * p, int i )
{
#if defined(__BYTE_ORDER__) && (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__)
return p[i];
#else
const uint8_t *c = (const uint8_t *)&p[i];
return (uint64_t)c[0] |
(uint64_t)c[1] << 8 |
(uint64_t)c[2] << 16 |
(uint64_t)c[3] << 24 |
(uint64_t)c[4] << 32 |
(uint64_t)c[5] << 40 |
(uint64_t)c[6] << 48 |
(uint64_t)c[7] << 56;
#endif
}
//-----------------------------------------------------------------------------
@ -329,9 +347,13 @@ void MurmurHash3_x64_128 ( const void * key, const size_t len,
h1 += h2;
h2 += h1;
#if defined(__BYTE_ORDER__) && (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__)
((uint64_t*)out)[0] = h1;
((uint64_t*)out)[1] = h2;
#else
((uint64_t*)out)[0] = h2;
((uint64_t*)out)[1] = h1;
#endif
}
//-----------------------------------------------------------------------------

View File

@ -128,7 +128,7 @@ function run_tests()
set +e
if [[ -n "$USE_PARALLEL_REPLICAS" ]] && [[ "$USE_PARALLEL_REPLICAS" -eq 1 ]]; then
clickhouse-test --client="clickhouse-client --use_hedged_requests=0 --allow_experimental_parallel_reading_from_replicas=1 \
clickhouse-test --client="clickhouse-client --use_hedged_requests=0 --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 \
--max_parallel_replicas=100 --cluster_for_parallel_replicas='parallel_replicas'" \
-j 2 --testname --shard --zookeeper --check-zookeeper-session --no-stateless --no-parallel-replicas --hung-check --print-time "${ADDITIONAL_OPTIONS[@]}" \
"$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt

View File

@ -390,6 +390,22 @@ SELECT count() FROM table WHERE s < 'z'
SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
```
Data skipping indexes can also be created on composite columns:
```sql
-- on columns of type Map:
INDEX map_key_index mapKeys(map_column) TYPE bloom_filter
INDEX map_value_index mapValues(map_column) TYPE bloom_filter
-- on columns of type Tuple:
INDEX tuple_1_index tuple_column.1 TYPE bloom_filter
INDEX tuple_2_index tuple_column.2 TYPE bloom_filter
-- on columns of type Nested:
INDEX nested_1_index col.nested_col1 TYPE bloom_filter
INDEX nested_2_index col.nested_col2 TYPE bloom_filter
```
### Available Types of Indices {#available-types-of-indices}
#### MinMax
@ -432,20 +448,6 @@ Syntax: `tokenbf_v1(size_of_bloom_filter_in_bytes, number_of_hash_functions, ran
- An experimental index to support approximate nearest neighbor (ANN) search. See [here](annindexes.md) for details.
- An experimental inverted index to support full-text search. See [here](invertedindexes.md) for details.
## Example of index creation for Map data type
```
INDEX map_key_index mapKeys(map_column) TYPE bloom_filter GRANULARITY 1
INDEX map_key_index mapValues(map_column) TYPE bloom_filter GRANULARITY 1
```
``` sql
INDEX sample_index (u64 * length(s)) TYPE minmax GRANULARITY 4
INDEX sample_index2 (u64 * length(str), i32 + f64 * 100, date, str) TYPE set(100) GRANULARITY 4
INDEX sample_index3 (lower(str), str) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4
```
### Functions Support {#functions-support}
Conditions in the `WHERE` clause contains calls of the functions that operate with columns. If the column is a part of an index, ClickHouse tries to use this index when performing the functions. ClickHouse supports different subsets of functions for using indexes.

View File

@ -15,7 +15,7 @@ Usage examples:
## Usage in ClickHouse Server {#usage-in-clickhouse-server}
``` sql
ENGINE = GenerateRandom([random_seed] [,max_string_length] [,max_array_length])
ENGINE = GenerateRandom([random_seed [,max_string_length [,max_array_length]]])
```
The `max_array_length` and `max_string_length` parameters specify maximum length of all

View File

@ -4049,3 +4049,32 @@ Possible values:
- 1 - enabled
Default value: `0`.
## stop_reading_on_first_cancel {#stop_reading_on_first_cancel}
When set to `true` and the user wants to interrupt a query (for example using `Ctrl+C` on the client), then the query continues execution only on data that was already read from the table. Afterward, it will return a partial result of the query for the part of the table that was read. To fully stop the execution of a query without a partial result, the user should send 2 cancel requests.
**Example without setting on Ctrl+C**
```sql
SELECT sum(number) FROM numbers(10000000000)
Cancelling query.
Ok.
Query was cancelled.
0 rows in set. Elapsed: 1.334 sec. Processed 52.65 million rows, 421.23 MB (39.48 million rows/s., 315.85 MB/s.)
```
**Example with setting on Ctrl+C**
```sql
SELECT sum(number) FROM numbers(10000000000) SETTINGS stop_reading_on_first_cancel=true
┌──────sum(number)─┐
│ 1355411451286266 │
└──────────────────┘
1 row in set. Elapsed: 1.331 sec. Processed 52.13 million rows, 417.05 MB (39.17 million rows/s., 313.33 MB/s.)
```
Possible values: `true`, `false`
Default value: `false`

View File

@ -4084,3 +4084,32 @@ ALTER TABLE test FREEZE SETTINGS alter_partition_verbose_result = 1;
Задает символ, который интерпретируется как суффикс после результирующего набора данных формата [CustomSeparated](../../interfaces/formats.md#format-customseparated).
Значение по умолчанию: `''`.
## stop_reading_on_first_cancel {#stop_reading_on_first_cancel}
Если установлено значение `true` и пользователь хочет прервать запрос (например, с помощью `Ctrl+C` на клиенте), то запрос продолжает выполнение только для данных, которые уже были считаны из таблицы. После этого он вернет частичный результат запроса для той части таблицы, которая была прочитана. Чтобы полностью остановить выполнение запроса без частичного результата, пользователь должен отправить 2 запроса отмены.
**Пример с выключенной настройкой при нажатии Ctrl+C**
```sql
SELECT sum(number) FROM numbers(10000000000)
Cancelling query.
Ok.
Query was cancelled.
0 rows in set. Elapsed: 1.334 sec. Processed 52.65 million rows, 421.23 MB (39.48 million rows/s., 315.85 MB/s.)
```
**Пример с включенной настройкой при нажатии Ctrl+C**
```sql
SELECT sum(number) FROM numbers(10000000000) SETTINGS stop_reading_on_first_cancel=true
┌──────sum(number)─┐
│ 1355411451286266 │
└──────────────────┘
1 row in set. Elapsed: 1.331 sec. Processed 52.13 million rows, 417.05 MB (39.17 million rows/s., 313.33 MB/s.)
```
Возможные значения:: `true`, `false`
Значение по умолчанию: `false`

View File

@ -222,6 +222,8 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
("pid-path", po::value<std::string>()->default_value("var/run/clickhouse-server"), "directory for pid file")
("user", po::value<std::string>()->default_value(DEFAULT_CLICKHOUSE_SERVER_USER), "clickhouse user to create")
("group", po::value<std::string>()->default_value(DEFAULT_CLICKHOUSE_SERVER_GROUP), "clickhouse group to create")
("noninteractive,y", "run non-interactively")
("link", "create symlink to the binary instead of copying to binary-path")
;
po::variables_map options;
@ -267,8 +269,6 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
/// Copy binary to the destination directory.
/// TODO An option to link instead of copy - useful for developers.
fs::path prefix = options["prefix"].as<std::string>();
fs::path bin_dir = prefix / options["binary-path"].as<std::string>();
@ -281,76 +281,129 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
bool old_binary_exists = fs::exists(main_bin_path);
bool already_installed = false;
/// Check if the binary is the same file (already installed).
if (old_binary_exists && binary_self_canonical_path == fs::canonical(main_bin_path))
if (options.count("link"))
{
already_installed = true;
fmt::print("ClickHouse binary is already located at {}\n", main_bin_path.string());
}
/// Check if binary has the same content.
else if (old_binary_exists && binary_size == fs::file_size(main_bin_path))
{
fmt::print("Found already existing ClickHouse binary at {} having the same size. Will check its contents.\n",
main_bin_path.string());
if (filesEqual(binary_self_path.string(), main_bin_path.string()))
if (old_binary_exists)
{
already_installed = true;
fmt::print("ClickHouse binary is already located at {} and it has the same content as {}\n",
main_bin_path.string(), binary_self_canonical_path.string());
}
}
bool is_symlink = FS::isSymlink(main_bin_path);
fs::path points_to;
if (is_symlink)
points_to = fs::weakly_canonical(FS::readSymlink(main_bin_path));
if (already_installed)
{
if (0 != chmod(main_bin_path.string().c_str(), S_IRUSR | S_IRGRP | S_IROTH | S_IXUSR | S_IXGRP | S_IXOTH))
throwFromErrno(fmt::format("Cannot chmod {}", main_bin_path.string()), ErrorCodes::SYSTEM_ERROR);
if (is_symlink && points_to == binary_self_canonical_path)
{
already_installed = true;
}
else
{
if (!is_symlink)
{
fmt::print("File {} already exists but it's not a symlink. Will rename to {}.\n",
main_bin_path.string(), main_bin_old_path.string());
fs::rename(main_bin_path, main_bin_old_path);
}
else if (points_to != main_bin_path)
{
fmt::print("Symlink {} already exists but it points to {}. Will replace the old symlink to {}.\n",
main_bin_path.string(), points_to.string(), binary_self_canonical_path.string());
fs::remove(main_bin_path);
}
}
}
if (!already_installed)
{
if (!fs::exists(bin_dir))
{
fmt::print("Creating binary directory {}.\n", bin_dir.string());
fs::create_directories(bin_dir);
}
fmt::print("Creating symlink {} to {}.\n", main_bin_path.string(), binary_self_canonical_path.string());
fs::create_symlink(binary_self_canonical_path, main_bin_path);
if (0 != chmod(binary_self_canonical_path.string().c_str(), S_IRUSR | S_IRGRP | S_IROTH | S_IXUSR | S_IXGRP | S_IXOTH))
throwFromErrno(fmt::format("Cannot chmod {}", binary_self_canonical_path.string()), ErrorCodes::SYSTEM_ERROR);
}
}
else
{
if (!fs::exists(bin_dir))
bool is_symlink = FS::isSymlink(main_bin_path);
if (!is_symlink)
{
fmt::print("Creating binary directory {}.\n", bin_dir.string());
fs::create_directories(bin_dir);
/// Check if the binary is the same file (already installed).
if (old_binary_exists && binary_self_canonical_path == fs::canonical(main_bin_path))
{
already_installed = true;
fmt::print("ClickHouse binary is already located at {}\n", main_bin_path.string());
}
/// Check if binary has the same content.
else if (old_binary_exists && binary_size == fs::file_size(main_bin_path))
{
fmt::print("Found already existing ClickHouse binary at {} having the same size. Will check its contents.\n",
main_bin_path.string());
if (filesEqual(binary_self_path.string(), main_bin_path.string()))
{
already_installed = true;
fmt::print("ClickHouse binary is already located at {} and it has the same content as {}\n",
main_bin_path.string(), binary_self_canonical_path.string());
}
}
}
size_t available_space = fs::space(bin_dir).available;
if (available_space < binary_size)
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for clickhouse binary in {}, required {}, available {}.",
bin_dir.string(), ReadableSize(binary_size), ReadableSize(available_space));
fmt::print("Copying ClickHouse binary to {}\n", main_bin_tmp_path.string());
try
if (already_installed)
{
ReadBufferFromFile in(binary_self_path.string());
WriteBufferFromFile out(main_bin_tmp_path.string());
copyData(in, out);
out.sync();
if (0 != fchmod(out.getFD(), S_IRUSR | S_IRGRP | S_IROTH | S_IXUSR | S_IXGRP | S_IXOTH))
throwFromErrno(fmt::format("Cannot chmod {}", main_bin_tmp_path.string()), ErrorCodes::SYSTEM_ERROR);
out.finalize();
if (0 != chmod(main_bin_path.string().c_str(), S_IRUSR | S_IRGRP | S_IROTH | S_IXUSR | S_IXGRP | S_IXOTH))
throwFromErrno(fmt::format("Cannot chmod {}", main_bin_path.string()), ErrorCodes::SYSTEM_ERROR);
}
catch (const Exception & e)
else
{
if (e.code() == ErrorCodes::CANNOT_OPEN_FILE && geteuid() != 0)
std::cerr << "Install must be run as root: " << formatWithSudo("./clickhouse install") << '\n';
throw;
if (!fs::exists(bin_dir))
{
fmt::print("Creating binary directory {}.\n", bin_dir.string());
fs::create_directories(bin_dir);
}
size_t available_space = fs::space(bin_dir).available;
if (available_space < binary_size)
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for clickhouse binary in {}, required {}, available {}.",
bin_dir.string(), ReadableSize(binary_size), ReadableSize(available_space));
fmt::print("Copying ClickHouse binary to {}\n", main_bin_tmp_path.string());
try
{
ReadBufferFromFile in(binary_self_path.string());
WriteBufferFromFile out(main_bin_tmp_path.string());
copyData(in, out);
out.sync();
if (0 != fchmod(out.getFD(), S_IRUSR | S_IRGRP | S_IROTH | S_IXUSR | S_IXGRP | S_IXOTH))
throwFromErrno(fmt::format("Cannot chmod {}", main_bin_tmp_path.string()), ErrorCodes::SYSTEM_ERROR);
out.finalize();
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::CANNOT_OPEN_FILE && geteuid() != 0)
std::cerr << "Install must be run as root: " << formatWithSudo("./clickhouse install") << '\n';
throw;
}
if (old_binary_exists)
{
fmt::print("{} already exists, will rename existing binary to {} and put the new binary in place\n",
main_bin_path.string(), main_bin_old_path.string());
/// There is file exchange operation in Linux but it's not portable.
fs::rename(main_bin_path, main_bin_old_path);
}
fmt::print("Renaming {} to {}.\n", main_bin_tmp_path.string(), main_bin_path.string());
fs::rename(main_bin_tmp_path, main_bin_path);
}
if (old_binary_exists)
{
fmt::print("{} already exists, will rename existing binary to {} and put the new binary in place\n",
main_bin_path.string(), main_bin_old_path.string());
/// There is file exchange operation in Linux but it's not portable.
fs::rename(main_bin_path, main_bin_old_path);
}
fmt::print("Renaming {} to {}.\n", main_bin_tmp_path.string(), main_bin_path.string());
fs::rename(main_bin_tmp_path, main_bin_path);
}
/// Create symlinks.
@ -384,7 +437,7 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
if (is_symlink)
points_to = fs::weakly_canonical(FS::readSymlink(symlink_path));
if (is_symlink && points_to == main_bin_path)
if (is_symlink && (points_to == main_bin_path || (options.count("link") && points_to == binary_self_canonical_path)))
{
need_to_create = false;
}
@ -709,7 +762,9 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
/// dpkg or apt installers can ask for non-interactive work explicitly.
const char * debian_frontend_var = getenv("DEBIAN_FRONTEND"); // NOLINT(concurrency-mt-unsafe)
bool noninteractive = debian_frontend_var && debian_frontend_var == std::string_view("noninteractive");
bool noninteractive = (debian_frontend_var && debian_frontend_var == std::string_view("noninteractive"))
|| options.count("noninteractive");
bool is_interactive = !noninteractive && stdin_is_a_tty && stdout_is_a_tty;

View File

@ -703,6 +703,9 @@
actions of previous constraint (defined in other profiles) for the same specific setting, including fields that are not set by new constraint.
It also enables 'changeable_in_readonly' constraint type -->
<settings_constraints_replace_previous>false</settings_constraints_replace_previous>
<!-- Number of seconds since last access a role is stored in the Role Cache -->
<role_cache_expiration_time_seconds>600</role_cache_expiration_time_seconds>
</access_control_improvements>
<!-- Default profile of settings. -->

View File

@ -247,7 +247,7 @@ private:
AccessControl::AccessControl()
: MultipleAccessStorage("user directories"),
context_access_cache(std::make_unique<ContextAccessCache>(*this)),
role_cache(std::make_unique<RoleCache>(*this)),
role_cache(std::make_unique<RoleCache>(*this, 600)),
row_policy_cache(std::make_unique<RowPolicyCache>(*this)),
quota_cache(std::make_unique<QuotaCache>(*this)),
settings_profiles_cache(std::make_unique<SettingsProfilesCache>(*this)),
@ -282,6 +282,8 @@ void AccessControl::setUpFromMainConfig(const Poco::Util::AbstractConfiguration
setSettingsConstraintsReplacePrevious(config_.getBool("access_control_improvements.settings_constraints_replace_previous", false));
addStoragesFromMainConfig(config_, config_path_, get_zookeeper_function_);
role_cache = std::make_unique<RoleCache>(*this, config_.getInt("access_control_improvements.role_cache_expiration_time_seconds", 600));
}

View File

@ -56,8 +56,8 @@ namespace
}
RoleCache::RoleCache(const AccessControl & access_control_)
: access_control(access_control_), cache(600000 /* 10 minutes */)
RoleCache::RoleCache(const AccessControl & access_control_, int expiration_time_seconds)
: access_control(access_control_), cache(expiration_time_seconds * 1000 /* 10 minutes by default*/)
{
}

View File

@ -16,7 +16,7 @@ using RolePtr = std::shared_ptr<const Role>;
class RoleCache
{
public:
explicit RoleCache(const AccessControl & access_control_);
explicit RoleCache(const AccessControl & access_control_, int expiration_time_seconds);
~RoleCache();
std::shared_ptr<const EnabledRoles> getEnabledRoles(

View File

@ -810,9 +810,12 @@ bool BackupCoordinationRemote::hasConcurrentBackups(const std::atomic<size_t> &)
if (existing_backup_uuid == toString(backup_uuid))
continue;
const auto status = zk->get(root_zookeeper_path + "/" + existing_backup_path + "/stage");
if (status != Stage::COMPLETED)
return true;
String status;
if (zk->tryGet(root_zookeeper_path + "/" + existing_backup_path + "/stage", status))
{
if (status != Stage::COMPLETED)
return true;
}
}
zk->createIfNotExists(backup_stage_path, "");

View File

@ -441,7 +441,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
if (restore_settings.async)
{
backups_thread_pool.scheduleOrThrowOnError(
restores_thread_pool.scheduleOrThrowOnError(
[this, restore_query, restore_id, backup_name_for_logging, backup_info, restore_settings, restore_coordination, context_in_use]
{
doRestore(

View File

@ -261,21 +261,31 @@ static void incrementProfileEventsBlock(Block & dst, const Block & src)
}
std::atomic_flag exit_on_signal;
std::atomic<Int32> exit_after_signals = 0;
class QueryInterruptHandler : private boost::noncopyable
{
public:
static void start() { exit_on_signal.clear(); }
/// Store how much interrupt signals can be before stopping the query
/// by default stop after the first interrupt signal.
static void start(Int32 signals_before_stop = 1) { exit_after_signals.store(signals_before_stop); }
/// Set value not greater then 0 to mark the query as stopped.
static void stop() { return exit_after_signals.store(0); }
/// Return true if the query was stopped.
static bool stop() { return exit_on_signal.test_and_set(); }
static bool cancelled() { return exit_on_signal.test(); }
/// Query was stopped if it received at least "signals_before_stop" interrupt signals.
static bool try_stop() { return exit_after_signals.fetch_sub(1) <= 0; }
static bool cancelled() { return exit_after_signals.load() <= 0; }
/// Return how much interrupt signals remain before stop.
static Int32 cancelled_status() { return exit_after_signals.load(); }
};
/// This signal handler is set only for SIGINT.
void interruptSignalHandler(int signum)
{
if (QueryInterruptHandler::stop())
if (QueryInterruptHandler::try_stop())
safeExit(128 + signum);
}
@ -850,12 +860,15 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
}
}
const auto & settings = global_context->getSettingsRef();
const Int32 signals_before_stop = settings.stop_reading_on_first_cancel ? 2 : 1;
int retries_left = 10;
while (retries_left)
{
try
{
QueryInterruptHandler::start();
QueryInterruptHandler::start(signals_before_stop);
SCOPE_EXIT({ QueryInterruptHandler::stop(); });
connection->sendQuery(
@ -872,7 +885,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
if (send_external_tables)
sendExternalTables(parsed_query);
receiveResult(parsed_query);
receiveResult(parsed_query, signals_before_stop, settings.stop_reading_on_first_cancel);
break;
}
@ -897,7 +910,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
/// Receives and processes packets coming from server.
/// Also checks if query execution should be cancelled.
void ClientBase::receiveResult(ASTPtr parsed_query)
void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, bool stop_reading_on_first_cancel)
{
// TODO: get the poll_interval from commandline.
const auto receive_timeout = connection_parameters.timeouts.receive_timeout;
@ -921,7 +934,13 @@ void ClientBase::receiveResult(ASTPtr parsed_query)
/// to avoid losing sync.
if (!cancelled)
{
if (QueryInterruptHandler::cancelled())
if (stop_reading_on_first_cancel && QueryInterruptHandler::cancelled_status() == signals_before_stop - 1)
{
connection->sendCancel();
/// First cancel reading request was sent. Next requests will only be with a full cancel
stop_reading_on_first_cancel = false;
}
else if (QueryInterruptHandler::cancelled())
{
cancelQuery();
}

View File

@ -131,7 +131,7 @@ protected:
private:
void receiveResult(ASTPtr parsed_query);
void receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, bool stop_reading_on_first_cancel);
bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled_);
void receiveLogsAndProfileEvents(ASTPtr parsed_query);
bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query);

View File

@ -46,8 +46,8 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_)
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_)
: max_threads(max_threads_)
, max_free_threads(max_free_threads_)
, queue_size(queue_size_)
, max_free_threads(std::min(max_free_threads_, max_threads))
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
{
}
@ -56,10 +56,26 @@ template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
{
std::lock_guard lock(mutex);
bool need_start_threads = (value > max_threads);
bool need_finish_free_threads = (value < max_free_threads);
max_threads = value;
max_free_threads = std::min(max_free_threads, max_threads);
/// We have to also adjust queue size, because it limits the number of scheduled and already running jobs in total.
queue_size = std::max(queue_size, max_threads);
queue_size = queue_size ? std::max(queue_size, max_threads) : 0;
jobs.reserve(queue_size);
if (need_start_threads)
{
/// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached.
startNewThreadsNoLock();
}
else if (need_finish_free_threads)
{
/// Wake up free threads so they can finish themselves.
new_job_or_shutdown.notify_all();
}
}
template <typename Thread>
@ -73,14 +89,22 @@ template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
{
std::lock_guard lock(mutex);
max_free_threads = value;
bool need_finish_free_threads = (value < max_free_threads);
max_free_threads = std::min(value, max_threads);
if (need_finish_free_threads)
{
/// Wake up free threads so they can finish themselves.
new_job_or_shutdown.notify_all();
}
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
{
std::lock_guard lock(mutex);
queue_size = value;
queue_size = value ? std::max(value, max_threads) : 0;
/// Reserve memory to get rid of allocations
jobs.reserve(queue_size);
}
@ -159,11 +183,42 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, ssize_t priority, std::
++scheduled_jobs;
}
/// Wake up a free thread to run the new job.
new_job_or_shutdown.notify_one();
return static_cast<ReturnType>(true);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::startNewThreadsNoLock()
{
if (shutdown)
return;
/// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached.
while (threads.size() < std::min(scheduled_jobs, max_threads))
{
try
{
threads.emplace_front();
}
catch (...)
{
break; /// failed to start more threads
}
try
{
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
}
catch (...)
{
threads.pop_front();
break; /// failed to start more threads
}
}
}
template <typename Thread>
void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, ssize_t priority)
{
@ -185,20 +240,18 @@ void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, ssize_t priority, uint64_t
template <typename Thread>
void ThreadPoolImpl<Thread>::wait()
{
{
std::unique_lock lock(mutex);
/// Signal here just in case.
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
new_job_or_shutdown.notify_all();
job_finished.wait(lock, [this] { return scheduled_jobs == 0; });
std::unique_lock lock(mutex);
/// Signal here just in case.
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
new_job_or_shutdown.notify_all();
job_finished.wait(lock, [this] { return scheduled_jobs == 0; });
if (first_exception)
{
std::exception_ptr exception;
std::swap(exception, first_exception);
std::rethrow_exception(exception);
}
if (first_exception)
{
std::exception_ptr exception;
std::swap(exception, first_exception);
std::rethrow_exception(exception);
}
}
@ -219,10 +272,14 @@ void ThreadPoolImpl<Thread>::finalize()
{
std::lock_guard lock(mutex);
shutdown = true;
/// We don't want threads to remove themselves from `threads` anymore, otherwise `thread.join()` will go wrong below in this function.
threads_remove_themselves = false;
}
/// Wake up threads so they can finish themselves.
new_job_or_shutdown.notify_all();
/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
thread.join();
@ -268,38 +325,53 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
CurrentMetrics::Increment metric_all_threads(
std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThread : CurrentMetrics::LocalThread);
/// Remove this thread from `threads` and detach it, that must be done before exiting from this worker.
/// We can't wrap the following lambda function into `SCOPE_EXIT` because it requires `mutex` to be locked.
auto detach_thread = [this, thread_it]
{
/// `mutex` is supposed to be already locked.
if (threads_remove_themselves)
{
thread_it->detach();
threads.erase(thread_it);
}
};
/// We'll run jobs in this worker while there are scheduled jobs and until some special event occurs (e.g. shutdown, or decreasing the number of max_threads).
/// And if `max_free_threads > 0` we keep this number of threads even when there are no jobs for them currently.
while (true)
{
/// This is inside the loop to also reset previous thread names set inside the jobs.
setThreadName("ThreadPool");
Job job;
bool need_shutdown = false;
/// A copy of parent trace context
DB::OpenTelemetry::TracingContextOnThread parent_thead_trace_context;
/// Get a job from the queue.
Job job;
std::exception_ptr exception_from_job;
bool need_shutdown = false;
{
std::unique_lock lock(mutex);
new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || (threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads)); });
need_shutdown = shutdown;
if (!jobs.empty())
if (jobs.empty())
{
/// boost::priority_queue does not provide interface for getting non-const reference to an element
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job.
job = std::move(const_cast<Job &>(jobs.top().job));
parent_thead_trace_context = std::move(const_cast<DB::OpenTelemetry::TracingContextOnThread &>(jobs.top().thread_trace_context));
jobs.pop();
}
else
{
/// shutdown is true, simply finish the thread.
/// No jobs and either `shutdown` is set or this thread is excessive. The worker will stop.
detach_thread();
return;
}
/// boost::priority_queue does not provide interface for getting non-const reference to an element
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job.
job = std::move(const_cast<Job &>(jobs.top().job));
parent_thead_trace_context = std::move(const_cast<DB::OpenTelemetry::TracingContextOnThread &>(jobs.top().thread_trace_context));
jobs.pop();
}
/// Run the job. We don't run jobs after `shutdown` is set.
if (!need_shutdown)
{
ALLOW_ALLOCATIONS_IN_SCOPE;
@ -326,46 +398,47 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
/// job should be reset before decrementing scheduled_jobs to
/// ensure that the Job destroyed before wait() returns.
job = {};
parent_thead_trace_context.reset();
}
catch (...)
{
thread_trace_context.root_span.addAttribute(std::current_exception());
exception_from_job = std::current_exception();
thread_trace_context.root_span.addAttribute(exception_from_job);
/// job should be reset before decrementing scheduled_jobs to
/// ensure that the Job destroyed before wait() returns.
job = {};
parent_thead_trace_context.reset();
{
std::lock_guard lock(mutex);
if (!first_exception)
first_exception = std::current_exception(); // NOLINT
if (shutdown_on_exception)
shutdown = true;
--scheduled_jobs;
}
job_finished.notify_all();
new_job_or_shutdown.notify_all();
return;
}
parent_thead_trace_context.reset();
}
/// The job is done.
{
std::lock_guard lock(mutex);
if (exception_from_job)
{
if (!first_exception)
first_exception = exception_from_job;
if (shutdown_on_exception)
shutdown = true;
}
--scheduled_jobs;
if (threads.size() > scheduled_jobs + max_free_threads)
if (threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads))
{
thread_it->detach();
threads.erase(thread_it);
/// This thread is excessive. The worker will stop.
detach_thread();
job_finished.notify_all();
if (shutdown)
new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves.
return;
}
}
job_finished.notify_all();
job_finished.notify_all();
if (shutdown)
new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves.
}
}
}

View File

@ -102,6 +102,7 @@ private:
size_t scheduled_jobs = 0;
bool shutdown = false;
bool threads_remove_themselves = true;
const bool shutdown_on_exception = true;
struct JobWithPriority
@ -129,6 +130,9 @@ private:
void worker(typename std::list<Thread>::iterator thread_it);
/// Tries to start new threads if there are scheduled jobs and the limit `max_threads` is not reached. Must be called with `mutex` locked.
void startNewThreadsNoLock();
void finalize();
void onDestroy();
};
@ -260,6 +264,11 @@ public:
return true;
}
std::thread::id get_id() const
{
return state ? state->thread_id.load() : std::thread::id{};
}
protected:
struct State
{

View File

@ -24,6 +24,8 @@ namespace DB
M(UInt64, max_backups_io_thread_pool_size, 1000, "The maximum number of threads that would be used for IO operations for BACKUP queries", 0) \
M(UInt64, max_backups_io_thread_pool_free_size, 0, "Max free size for backups IO thread pool.", 0) \
M(UInt64, backups_io_thread_pool_queue_size, 0, "Queue size for backups IO thread pool.", 0) \
M(UInt64, backup_threads, 16, "The maximum number of threads to execute BACKUP requests.", 0) \
M(UInt64, restore_threads, 16, "The maximum number of threads to execute RESTORE requests.", 0) \
M(Int32, max_connections, 1024, "Max server connections.", 0) \
M(UInt32, asynchronous_metrics_update_period_s, 1, "Period in seconds for updating asynchronous metrics.", 0) \
M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating asynchronous metrics.", 0) \

View File

@ -44,7 +44,7 @@ class IColumn;
M(UInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \
M(UInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \
M(UInt64, max_insert_delayed_streams_for_parallel_write, 0, "The maximum number of streams (columns) to delay final part flush. Default - auto (1000 in case of underlying storage supports parallel write, for example S3 and disabled otherwise)", 0) \
M(UInt64, max_final_threads, 16, "The maximum number of threads to read from table with FINAL.", 0) \
M(MaxThreads, max_final_threads, 0, "The maximum number of threads to read from table with FINAL.", 0) \
M(MaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
M(MaxThreads, max_download_threads, 4, "The maximum number of threads to download data (e.g. for URL engine).", 0) \
M(UInt64, max_download_buffer_size, 10*1024*1024, "The maximal size of buffer for parallel downloading (e.g. for URL engine) per each thread.", 0) \
@ -152,6 +152,7 @@ class IColumn;
M(String, cluster_for_parallel_replicas, "default", "Cluster for a shard in which current server is located", 0) \
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
M(Float, parallel_replicas_single_task_marks_count_multiplier, 2, "A multiplier which will be added during calculation for minimal number of marks to retrieve from coordinator. This will be applied only for remote replicas.", 0) \
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
\
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
\
@ -280,6 +281,7 @@ class IColumn;
\
M(Bool, final, false, "Query with the FINAL modifier by default. If the engine does not support final, it does not have any effect. On queries with multiple tables final is applied only on those that support it. It also works on distributed tables", 0) \
\
M(Bool, stop_reading_on_first_cancel, false, "Allows query to return a partial result after cancel.", 0) \
/** Settings for testing hedged requests */ \
M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \
@ -413,8 +415,6 @@ class IColumn;
M(UInt64, max_temporary_data_on_disk_size_for_user, 0, "The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running user queries. Zero means unlimited.", 0)\
M(UInt64, max_temporary_data_on_disk_size_for_query, 0, "The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running queries. Zero means unlimited.", 0)\
\
M(UInt64, backup_threads, 16, "The maximum number of threads to execute BACKUP requests.", 0) \
M(UInt64, restore_threads, 16, "The maximum number of threads to execute RESTORE requests.", 0) \
M(UInt64, backup_keeper_max_retries, 20, "Max retries for keeper operations during backup", 0) \
M(UInt64, backup_keeper_retry_initial_backoff_ms, 100, "Initial backoff timeout for [Zoo]Keeper operations during backup", 0) \
M(UInt64, backup_keeper_retry_max_backoff_ms, 5000, "Max backoff timeout for [Zoo]Keeper operations during backup", 0) \
@ -760,6 +760,8 @@ class IColumn;
MAKE_OBSOLETE(M, Milliseconds, async_insert_cleanup_timeout_ms, 1000) \
MAKE_OBSOLETE(M, Bool, optimize_fuse_sum_count_avg, 0) \
MAKE_OBSOLETE(M, Seconds, drain_timeout, 3) \
MAKE_OBSOLETE(M, UInt64, backup_threads, 16) \
MAKE_OBSOLETE(M, UInt64, restore_threads, 16) \
/** The section above is for obsolete settings. Do not add anything there. */

View File

@ -928,7 +928,16 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
for (const auto & table_id : tables_to_create)
{
auto table_name = table_id.getTableName();
auto create_query_string = table_name_to_metadata[table_name];
auto metadata_it = table_name_to_metadata.find(table_name);
if (metadata_it == table_name_to_metadata.end())
{
/// getTablesSortedByDependency() may return some not existing tables or tables from other databases
LOG_WARNING(log, "Got table name {} when resolving table dependencies, "
"but database {} does not have metadata for that table. Ignoring it", table_id.getNameForLogs(), getDatabaseName());
continue;
}
const auto & create_query_string = metadata_it->second;
if (isTableExist(table_name, getContext()))
{
assert(create_query_string == readMetadataFile(table_name));

View File

@ -2,8 +2,10 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Functions/IFunction.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Functions/IFunction.h>
#include <Functions/extractTimeZoneFromFunctionArguments.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/TransformDateTime64.h>
@ -60,6 +62,9 @@ public:
const auto * type_ptr = &type;
if (const auto * lc_type = checkAndGetDataType<DataTypeLowCardinality>(type_ptr))
type_ptr = lc_type->getDictionaryType().get();
if (const auto * nullable_type = checkAndGetDataType<DataTypeNullable>(type_ptr))
type_ptr = nullable_type->getNestedType().get();

View File

@ -26,24 +26,24 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
static String getRootNodeName(UserDefinedSQLObjectType object_type)
namespace
{
switch (object_type)
std::string_view getNodePrefix(UserDefinedSQLObjectType object_type)
{
case UserDefinedSQLObjectType::Function:
return "functions";
switch (object_type)
{
case UserDefinedSQLObjectType::Function:
return "function_";
}
UNREACHABLE();
}
UNREACHABLE();
}
static String getRootNodePath(const String & root_path, UserDefinedSQLObjectType object_type)
{
return root_path + "/" + getRootNodeName(object_type);
}
constexpr std::string_view sql_extension = ".sql";
static String getNodePath(const String & root_path, UserDefinedSQLObjectType object_type, const String & object_name)
{
return getRootNodePath(root_path, object_type) + "/" + escapeForFileName(object_name);
String getNodePath(const String & root_path, UserDefinedSQLObjectType object_type, const String & object_name)
{
return root_path + "/" + String{getNodePrefix(object_type)} + escapeForFileName(object_name) + String{sql_extension};
}
}
@ -119,10 +119,20 @@ void UserDefinedSQLObjectsLoaderFromZooKeeper::resetAfterError()
void UserDefinedSQLObjectsLoaderFromZooKeeper::loadObjects()
{
/// loadObjects() is called at start from Server::main(), so it's better not to stop here on no connection to ZooKeeper or any other error.
/// However the watching thread must be started anyway in case the connection will be established later.
if (!objects_loaded)
{
reloadObjects();
try
{
reloadObjects();
}
catch (...)
{
tryLogCurrentException(log, "Failed to load user-defined objects");
}
}
startWatchingThread();
}
@ -188,7 +198,6 @@ void UserDefinedSQLObjectsLoaderFromZooKeeper::createRootNodes(const zkutil::Zoo
{
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
zookeeper->createIfNotExists(zookeeper_path + "/functions", "");
}
bool UserDefinedSQLObjectsLoaderFromZooKeeper::storeObject(
@ -344,17 +353,19 @@ Strings UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectNamesAndSetWatch(
};
Coordination::Stat stat;
const auto path = getRootNodePath(zookeeper_path, object_type);
const auto node_names = zookeeper->getChildrenWatch(path, &stat, object_list_watcher);
const auto node_names = zookeeper->getChildrenWatch(zookeeper_path, &stat, object_list_watcher);
const auto prefix = getNodePrefix(object_type);
Strings object_names;
object_names.reserve(node_names.size());
for (const auto & node_name : node_names)
{
String object_name = unescapeForFileName(node_name);
if (!object_name.empty())
object_names.push_back(std::move(object_name));
if (node_name.starts_with(prefix) && node_name.ends_with(sql_extension))
{
String object_name = unescapeForFileName(node_name.substr(prefix.length(), node_name.length() - prefix.length() - sql_extension.length()));
if (!object_name.empty())
object_names.push_back(std::move(object_name));
}
}
return object_names;

View File

@ -1179,12 +1179,15 @@ ColumnPtr FunctionArrayElement::perform(const ColumnsWithTypeAndName & arguments
|| (res = executeArgument<Int16>(arguments, result_type, builder, input_rows_count))
|| (res = executeArgument<Int32>(arguments, result_type, builder, input_rows_count))
|| (res = executeArgument<Int64>(arguments, result_type, builder, input_rows_count))))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Second argument for function {} must have UInt or Int type.", getName());
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Second argument for function {} must have UInt or Int type", getName());
}
else
{
Field index = (*arguments[1].column)[0];
if (index.getType() != Field::Types::UInt64 && index.getType() != Field::Types::Int64)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Second argument for function {} must have UInt or Int type", getName());
if (builder)
builder.initSink(input_rows_count);

View File

@ -414,7 +414,7 @@ void WriteBufferFromS3::completeMultipartUpload()
if (outcome.IsSuccess())
{
LOG_TRACE(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", bucket, key, multipart_upload_id, tags.size());
break;
return;
}
else
{
@ -435,6 +435,11 @@ void WriteBufferFromS3::completeMultipartUpload()
}
}
}
throw S3Exception(
Aws::S3::S3Errors::NO_SUCH_KEY,
"Message: Multipart upload failed with NO_SUCH_KEY error, retries {}, Key: {}, Bucket: {}",
max_retry, key, bucket);
}
void WriteBufferFromS3::makeSinglepartUpload()
@ -535,7 +540,7 @@ void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
if (outcome.IsSuccess())
{
LOG_TRACE(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool);
break;
return;
}
else
{
@ -556,6 +561,11 @@ void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
}
}
}
throw S3Exception(
Aws::S3::S3Errors::NO_SUCH_KEY,
"Message: Single part upload failed with NO_SUCH_KEY error, retries {}, Key: {}, Bucket: {}",
max_retry, key, bucket);
}
void WriteBufferFromS3::waitForReadyBackGroundTasks()

View File

@ -1918,8 +1918,13 @@ BackupsWorker & Context::getBackupsWorker() const
const bool allow_concurrent_backups = this->getConfigRef().getBool("backups.allow_concurrent_backups", true);
const bool allow_concurrent_restores = this->getConfigRef().getBool("backups.allow_concurrent_restores", true);
const auto & config = getConfigRef();
const auto & settings = getSettingsRef();
UInt64 backup_threads = config.getUInt64("backup_threads", settings.backup_threads);
UInt64 restore_threads = config.getUInt64("restore_threads", settings.restore_threads);
if (!shared->backups_worker)
shared->backups_worker.emplace(getSettingsRef().backup_threads, getSettingsRef().restore_threads, allow_concurrent_backups, allow_concurrent_restores);
shared->backups_worker.emplace(backup_threads, restore_threads, allow_concurrent_backups, allow_concurrent_restores);
return *shared->backups_worker;
}

View File

@ -119,7 +119,6 @@ ASTPtr ASTGrantQuery::clone() const
void ASTGrantQuery::formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const
{
settings.ostr << (settings.hilite ? IAST::hilite_keyword : "") << (attach_mode ? "ATTACH " : "")
<< (settings.hilite ? hilite_keyword : "") << ((!is_revoke && (replace_access || replace_granted_roles)) ? "REPLACE " : "") << (settings.hilite ? hilite_none : "")
<< (settings.hilite ? hilite_keyword : "") << (is_revoke ? "REVOKE" : "GRANT")
<< (settings.hilite ? IAST::hilite_none : "");
@ -161,6 +160,9 @@ void ASTGrantQuery::formatImpl(const FormatSettings & settings, FormatState &, F
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH GRANT OPTION" << (settings.hilite ? hilite_none : "");
else if (admin_option)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH ADMIN OPTION" << (settings.hilite ? hilite_none : "");
if (replace_access || replace_granted_roles)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH REPLACE OPTION" << (settings.hilite ? hilite_none : "");
}
}

View File

@ -16,6 +16,7 @@ ExecutingGraph::ExecutingGraph(std::shared_ptr<Processors> processors_, bool pro
{
uint64_t num_processors = processors->size();
nodes.reserve(num_processors);
source_processors.reserve(num_processors);
/// Create nodes.
for (uint64_t node = 0; node < num_processors; ++node)
@ -23,6 +24,9 @@ ExecutingGraph::ExecutingGraph(std::shared_ptr<Processors> processors_, bool pro
IProcessor * proc = processors->at(node).get();
processors_map[proc] = node;
nodes.emplace_back(std::make_unique<Node>(proc, node));
bool is_source = proc->getInputs().empty();
source_processors.emplace_back(is_source);
}
/// Create edges.
@ -117,6 +121,14 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
return false;
}
processors->insert(processors->end(), new_processors.begin(), new_processors.end());
source_processors.reserve(source_processors.size() + new_processors.size());
for (auto & proc: new_processors)
{
bool is_source = proc->getInputs().empty();
source_processors.emplace_back(is_source);
}
}
uint64_t num_processors = processors->size();
@ -390,17 +402,25 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
return true;
}
void ExecutingGraph::cancel()
void ExecutingGraph::cancel(bool cancel_all_processors)
{
std::exception_ptr exception_ptr;
{
std::lock_guard guard(processors_mutex);
for (auto & processor : *processors)
uint64_t num_processors = processors->size();
for (uint64_t proc = 0; proc < num_processors; ++proc)
{
try
{
processor->cancel();
/// Stop all processors in the general case, but in a specific case
/// where the pipeline needs to return a result on a partially read table,
/// stop only the processors that read from the source
if (cancel_all_processors || source_processors.at(proc))
{
IProcessor * processor = processors->at(proc).get();
processor->cancel();
}
}
catch (...)
{
@ -415,7 +435,8 @@ void ExecutingGraph::cancel()
tryLogCurrentException("ExecutingGraph");
}
}
cancelled = true;
if (cancel_all_processors)
cancelled = true;
}
if (exception_ptr)

View File

@ -6,6 +6,7 @@
#include <mutex>
#include <queue>
#include <stack>
#include <vector>
namespace DB
@ -137,7 +138,7 @@ public:
/// If processor wants to be expanded, lock will be upgraded to get write access to pipeline.
bool updateNode(uint64_t pid, Queue & queue, Queue & async_queue);
void cancel();
void cancel(bool cancel_all_processors = true);
private:
/// Add single edge to edges list. Check processor is known.
@ -152,6 +153,7 @@ private:
bool expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
std::shared_ptr<Processors> processors;
std::vector<bool> source_processors;
std::mutex processors_mutex;
UpgradableMutex nodes_mutex;

View File

@ -74,6 +74,15 @@ void PipelineExecutor::cancel()
graph->cancel();
}
void PipelineExecutor::cancelReading()
{
if (!cancelled_reading)
{
cancelled_reading = true;
graph->cancel(/*cancel_all_processors*/ false);
}
}
void PipelineExecutor::finish()
{
tasks.finish();
@ -148,6 +157,7 @@ bool PipelineExecutor::checkTimeLimitSoft()
// so that the "break" is faster and doesn't wait for long events
if (!continuing)
cancel();
return continuing;
}

View File

@ -50,6 +50,9 @@ public:
/// Cancel execution. May be called from another thread.
void cancel();
/// Cancel processors which only read data from source. May be called from another thread.
void cancelReading();
/// Checks the query time limits (cancelled or timeout). Throws on cancellation or when time limit is reached and the query uses "break"
bool checkTimeLimit();
/// Same as checkTimeLimit but it never throws. It returns false on cancellation or time limit reached
@ -78,6 +81,7 @@ private:
bool trace_processors = false;
std::atomic_bool cancelled = false;
std::atomic_bool cancelled_reading = false;
Poco::Logger * log = &Poco::Logger::get("PipelineExecutor");

View File

@ -179,10 +179,41 @@ void PullingAsyncPipelineExecutor::cancel()
return;
/// Cancel execution if it wasn't finished.
try
cancelWithExceptionHandling([&]()
{
if (!data->is_finished && data->executor)
data->executor->cancel();
});
/// The following code is needed to rethrow exception from PipelineExecutor.
/// It could have been thrown from pull(), but we will not likely call it again.
/// Join thread here to wait for possible exception.
if (data->thread.joinable())
data->thread.join();
/// Rethrow exception to not swallow it in destructor.
data->rethrowExceptionIfHas();
}
void PullingAsyncPipelineExecutor::cancelReading()
{
if (!data)
return;
/// Stop reading from source if pipeline wasn't finished.
cancelWithExceptionHandling([&]()
{
if (!data->is_finished && data->executor)
data->executor->cancelReading();
});
}
void PullingAsyncPipelineExecutor::cancelWithExceptionHandling(CancelFunc && cancel_func)
{
try
{
cancel_func();
}
catch (...)
{
@ -194,16 +225,6 @@ void PullingAsyncPipelineExecutor::cancel()
data->has_exception = true;
}
}
/// The following code is needed to rethrow exception from PipelineExecutor.
/// It could have been thrown from pull(), but we will not likely call it again.
/// Join thread here to wait for possible exception.
if (data->thread.joinable())
data->thread.join();
/// Rethrow exception to not swallow it in destructor.
data->rethrowExceptionIfHas();
}
Chunk PullingAsyncPipelineExecutor::getTotals()

View File

@ -1,4 +1,5 @@
#pragma once
#include <functional>
#include <memory>
namespace DB
@ -32,9 +33,12 @@ public:
bool pull(Chunk & chunk, uint64_t milliseconds = 0);
bool pull(Block & block, uint64_t milliseconds = 0);
/// Stop execution. It is not necessary, but helps to stop execution before executor is destroyed.
/// Stop execution of all processors. It is not necessary, but helps to stop execution before executor is destroyed.
void cancel();
/// Stop processors which only read data from source.
void cancelReading();
/// Get totals and extremes. Returns empty chunk if doesn't have any.
Chunk getTotals();
Chunk getExtremes();
@ -49,6 +53,11 @@ public:
/// Internal executor data.
struct Data;
private:
using CancelFunc = std::function<void()>;
void cancelWithExceptionHandling(CancelFunc && cancel_func);
private:
QueryPipeline & pipeline;
std::shared_ptr<LazyOutputFormat> lazy_format;

View File

@ -76,7 +76,7 @@ ISource::Status RemoteSource::prepare()
return status;
}
if (status == Status::PortFull)
if (status == Status::PortFull || status == Status::Ready)
{
/// Also push empty chunk to dependency to signal that we read data from remote source
/// or answered to the incoming request from parallel replica

View File

@ -63,8 +63,10 @@ HTTPServerRequest::HTTPServerRequest(HTTPContextPtr context, HTTPServerResponse
}
else if (getMethod() != HTTPRequest::HTTP_GET && getMethod() != HTTPRequest::HTTP_HEAD && getMethod() != HTTPRequest::HTTP_DELETE)
{
/// That check for has_body may be false-negative in rare cases, but it's okay
bool has_body = in->hasPendingData();
stream = std::move(in);
if (!startsWith(getContentType(), "multipart/form-data"))
if (!startsWith(getContentType(), "multipart/form-data") && has_body)
LOG_WARNING(&Poco::Logger::get("HTTPServerRequest"), "Got an HTTP request with no content length "
"and no chunked/multipart encoding, it may be impossible to distinguish graceful EOF from abnormal connection loss");
}

View File

@ -318,6 +318,9 @@ bool PostgreSQLHandler::isEmptyQuery(const String & query)
{
if (query.empty())
return true;
/// golang driver pgx sends ";"
if (query == ";")
return true;
Poco::RegularExpression regex(R"(\A\s*\z)");
return regex.match(query);

View File

@ -376,7 +376,7 @@ void TCPHandler::runImpl()
std::lock_guard lock(task_callback_mutex);
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return {};
sendReadTaskRequestAssumeLocked();
@ -392,7 +392,7 @@ void TCPHandler::runImpl()
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeAllRangesAnnouncementsSent);
std::lock_guard lock(task_callback_mutex);
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return;
sendMergeTreeAllRangesAnnounecementAssumeLocked(announcement);
@ -406,7 +406,7 @@ void TCPHandler::runImpl()
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeReadTaskRequestsSent);
std::lock_guard lock(task_callback_mutex);
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return std::nullopt;
sendMergeTreeReadTaskRequestAssumeLocked(std::move(request));
@ -424,7 +424,7 @@ void TCPHandler::runImpl()
auto finish_or_cancel = [this]()
{
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
state.io.onCancelOrConnectionLoss();
else
state.io.onFinish();
@ -454,7 +454,7 @@ void TCPHandler::runImpl()
{
std::scoped_lock lock(task_callback_mutex, fatal_error_mutex);
if (isQueryCancelled())
if (getQueryCancellationStatus() == CancellationStatus::FULLY_CANCELLED)
return true;
sendProgress();
@ -673,7 +673,7 @@ bool TCPHandler::readDataNext()
{
LOG_INFO(log, "Client has dropped the connection, cancel the query.");
state.is_connection_closed = true;
state.is_cancelled = true;
state.cancellation_status = CancellationStatus::FULLY_CANCELLED;
break;
}
@ -718,7 +718,7 @@ void TCPHandler::readData()
while (readDataNext())
;
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
}
@ -731,7 +731,7 @@ void TCPHandler::skipData()
while (readDataNext())
;
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
}
@ -769,7 +769,7 @@ void TCPHandler::processInsertQuery()
while (readDataNext())
executor.push(std::move(state.block_for_insert));
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
executor.cancel();
else
executor.finish();
@ -823,7 +823,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
{
std::unique_lock lock(task_callback_mutex);
if (isQueryCancelled())
auto cancellation_status = getQueryCancellationStatus();
if (cancellation_status == CancellationStatus::FULLY_CANCELLED)
{
/// Several callback like callback for parallel reading could be called from inside the pipeline
/// and we have to unlock the mutex from our side to prevent deadlock.
@ -832,6 +833,10 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
executor.cancel();
break;
}
else if (cancellation_status == CancellationStatus::READ_CANCELLED)
{
executor.cancelReading();
}
if (after_send_progress.elapsed() / 1000 >= interactive_delay)
{
@ -862,7 +867,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
* because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time.
*/
if (!isQueryCancelled())
if (getQueryCancellationStatus() != CancellationStatus::FULLY_CANCELLED)
{
sendTotals(executor.getTotalsBlock());
sendExtremes(executor.getExtremesBlock());
@ -1352,8 +1357,7 @@ bool TCPHandler::receivePacket()
return false;
case Protocol::Client::Cancel:
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the query");
state.is_cancelled = true;
decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the query.");
return false;
case Protocol::Client::Hello:
@ -1394,8 +1398,7 @@ String TCPHandler::receiveReadTaskResponseAssumeLocked()
{
if (packet_type == Protocol::Client::Cancel)
{
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the read task");
state.is_cancelled = true;
decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the read task.");
return {};
}
else
@ -1422,8 +1425,7 @@ std::optional<ParallelReadResponse> TCPHandler::receivePartitionMergeTreeReadTas
{
if (packet_type == Protocol::Client::Cancel)
{
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the MergeTree read task");
state.is_cancelled = true;
decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the MergeTree read task.");
return std::nullopt;
}
else
@ -1812,14 +1814,37 @@ void TCPHandler::initProfileEventsBlockOutput(const Block & block)
}
}
bool TCPHandler::isQueryCancelled()
void TCPHandler::decreaseCancellationStatus(const std::string & log_message)
{
if (state.is_cancelled || state.sent_all_data)
return true;
auto prev_status = magic_enum::enum_name(state.cancellation_status);
bool stop_reading_on_first_cancel = false;
if (query_context)
{
const auto & settings = query_context->getSettingsRef();
stop_reading_on_first_cancel = settings.stop_reading_on_first_cancel;
}
if (stop_reading_on_first_cancel && state.cancellation_status == CancellationStatus::NOT_CANCELLED)
{
state.cancellation_status = CancellationStatus::READ_CANCELLED;
}
else
{
state.cancellation_status = CancellationStatus::FULLY_CANCELLED;
}
auto current_status = magic_enum::enum_name(state.cancellation_status);
LOG_INFO(log, "Change cancellation status from {} to {}. Log message: {}", prev_status, current_status, log_message);
}
QueryState::CancellationStatus TCPHandler::getQueryCancellationStatus()
{
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED || state.sent_all_data)
return CancellationStatus::FULLY_CANCELLED;
if (after_check_cancelled.elapsed() / 1000 < interactive_delay)
return false;
return state.cancellation_status;
after_check_cancelled.restart();
@ -1829,9 +1854,9 @@ bool TCPHandler::isQueryCancelled()
if (in->eof())
{
LOG_INFO(log, "Client has dropped the connection, cancel the query.");
state.is_cancelled = true;
state.cancellation_status = CancellationStatus::FULLY_CANCELLED;
state.is_connection_closed = true;
return true;
return CancellationStatus::FULLY_CANCELLED;
}
UInt64 packet_type = 0;
@ -1842,16 +1867,17 @@ bool TCPHandler::isQueryCancelled()
case Protocol::Client::Cancel:
if (state.empty())
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Cancel received from client");
LOG_INFO(log, "Query was cancelled.");
state.is_cancelled = true;
return true;
decreaseCancellationStatus("Query was cancelled.");
return state.cancellation_status;
default:
throw NetException(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT, "Unknown packet from client {}", toString(packet_type));
}
}
return false;
return state.cancellation_status;
}

View File

@ -76,8 +76,17 @@ struct QueryState
/// Streams of blocks, that are processing the query.
BlockIO io;
enum class CancellationStatus: UInt8
{
FULLY_CANCELLED,
READ_CANCELLED,
NOT_CANCELLED
};
static std::string cancellationStatusToName(CancellationStatus status);
/// Is request cancelled
bool is_cancelled = false;
CancellationStatus cancellation_status = CancellationStatus::NOT_CANCELLED;
bool is_connection_closed = false;
/// empty or not
bool is_empty = true;
@ -272,7 +281,10 @@ private:
void initLogsBlockOutput(const Block & block);
void initProfileEventsBlockOutput(const Block & block);
bool isQueryCancelled();
using CancellationStatus = QueryState::CancellationStatus;
void decreaseCancellationStatus(const std::string & log_message);
CancellationStatus getQueryCancellationStatus();
/// This function is called from different threads.
void updateProgress(const Progress & value);

View File

@ -456,6 +456,14 @@ StorageInMemoryMetadata ReplicatedMergeTreeTableMetadata::Diff::getNewMetadata(c
new_metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST(
new_metadata.table_ttl.definition_ast, new_metadata.columns, context, new_metadata.primary_key);
if (!projections_changed)
{
ProjectionsDescription recalculated_projections;
for (const auto & projection : new_metadata.projections)
recalculated_projections.add(ProjectionDescription::getProjectionFromAST(projection.definition_ast, new_metadata.columns, context));
new_metadata.projections = std::move(recalculated_projections);
}
return new_metadata;
}

View File

@ -214,7 +214,7 @@ void StorageMergeTree::read(
size_t max_block_size,
size_t num_streams)
{
if (local_context->canUseParallelReplicasOnInitiator())
if (local_context->canUseParallelReplicasOnInitiator() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree)
{
auto table_id = getStorageID();
@ -245,10 +245,12 @@ void StorageMergeTree::read(
}
else
{
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree;
if (auto plan = reader.read(
column_names, storage_snapshot, query_info,
local_context, max_block_size, num_streams,
processed_stage, nullptr, /*enable_parallel_reading*/local_context->canUseParallelReplicasOnFollower()))
processed_stage, nullptr, enable_parallel_reading))
query_plan = std::move(*plan);
}

View File

@ -25,5 +25,6 @@
<select_from_system_db_requires_grant>true</select_from_system_db_requires_grant>
<select_from_information_schema_requires_grant>true</select_from_information_schema_requires_grant>
<settings_constraints_replace_previous>true</settings_constraints_replace_previous>
<role_cache_expiration_time_seconds>2</role_cache_expiration_time_seconds>
</access_control_improvements>
</clickhouse>

View File

@ -1,264 +0,0 @@
[
"test_atomic_drop_table/test.py::test_atomic_delete_with_stopped_zookeeper",
"test_attach_without_fetching/test.py::test_attach_without_fetching",
"test_broken_part_during_merge/test.py::test_merge_and_part_corruption",
"test_cleanup_dir_after_bad_zk_conn/test.py::test_attach_without_zk",
"test_cleanup_dir_after_bad_zk_conn/test.py::test_cleanup_dir_after_bad_zk_conn",
"test_cleanup_dir_after_bad_zk_conn/test.py::test_cleanup_dir_after_wrong_replica_name",
"test_cleanup_dir_after_bad_zk_conn/test.py::test_cleanup_dir_after_wrong_zk_path",
"test_consistent_parts_after_clone_replica/test.py::test_inconsistent_parts_if_drop_while_replica_not_active",
"test_cross_replication/test.py::test",
"test_ddl_worker_non_leader/test.py::test_non_leader_replica",
"test_delayed_replica_failover/test.py::test",
"test_dictionaries_update_field/test.py::test_update_field[complex_key_hashed_update_field_dictionary-HASHED]",
"test_dictionaries_update_field/test.py::test_update_field[flat_update_field_dictionary-FLAT]",
"test_dictionaries_update_field/test.py::test_update_field[simple_key_hashed_update_field_dictionary-HASHED]",
"test_dictionary_allow_read_expired_keys/test_default_reading.py::test_default_reading",
"test_dictionary_allow_read_expired_keys/test_default_string.py::test_return_real_values",
"test_dictionary_allow_read_expired_keys/test_dict_get_or_default.py::test_simple_dict_get_or_default",
"test_dictionary_allow_read_expired_keys/test_dict_get.py::test_simple_dict_get",
"test_disabled_mysql_server/test.py::test_disabled_mysql_server",
"test_distributed_ddl_on_cross_replication/test.py::test_alter_ddl",
"test_distributed_ddl_on_cross_replication/test.py::test_atomic_database",
"test_distributed_ddl_parallel/test.py::test_all_in_parallel",
"test_distributed_ddl_parallel/test.py::test_slow_dict_load_7",
"test_distributed_ddl_parallel/test.py::test_smoke",
"test_distributed_ddl_parallel/test.py::test_smoke_parallel",
"test_distributed_ddl_parallel/test.py::test_smoke_parallel_dict_reload",
"test_distributed_ddl_parallel/test.py::test_two_in_parallel_two_queued",
"test_distributed_ddl_password/test.py::test_alter",
"test_distributed_ddl_password/test.py::test_truncate",
"test_distributed_ddl/test.py::test_allowed_databases[configs]",
"test_distributed_ddl/test.py::test_allowed_databases[configs_secure]",
"test_distributed_ddl/test.py::test_create_as_select[configs]",
"test_distributed_ddl/test.py::test_create_as_select[configs_secure]",
"test_distributed_ddl/test.py::test_create_reserved[configs]",
"test_distributed_ddl/test.py::test_create_reserved[configs_secure]",
"test_distributed_ddl/test.py::test_create_view[configs]",
"test_distributed_ddl/test.py::test_create_view[configs_secure]",
"test_distributed_ddl/test.py::test_default_database[configs]",
"test_distributed_ddl/test.py::test_default_database[configs_secure]",
"test_distributed_ddl/test.py::test_detach_query[configs]",
"test_distributed_ddl/test.py::test_detach_query[configs_secure]",
"test_distributed_ddl/test.py::test_implicit_macros[configs]",
"test_distributed_ddl/test.py::test_implicit_macros[configs_secure]",
"test_distributed_ddl/test.py::test_kill_query[configs]",
"test_distributed_ddl/test.py::test_kill_query[configs_secure]",
"test_distributed_ddl/test.py::test_macro[configs]",
"test_distributed_ddl/test.py::test_macro[configs_secure]",
"test_distributed_ddl/test.py::test_on_connection_loss[configs]",
"test_distributed_ddl/test.py::test_on_connection_loss[configs_secure]",
"test_distributed_ddl/test.py::test_on_server_fail[configs]",
"test_distributed_ddl/test.py::test_on_server_fail[configs_secure]",
"test_distributed_ddl/test.py::test_on_session_expired[configs]",
"test_distributed_ddl/test.py::test_on_session_expired[configs_secure]",
"test_distributed_ddl/test.py::test_optimize_query[configs]",
"test_distributed_ddl/test.py::test_optimize_query[configs_secure]",
"test_distributed_ddl/test.py::test_rename[configs]",
"test_distributed_ddl/test.py::test_rename[configs_secure]",
"test_distributed_ddl/test.py::test_replicated_without_arguments[configs]",
"test_distributed_ddl/test.py::test_replicated_without_arguments[configs_secure]",
"test_distributed_ddl/test.py::test_simple_alters[configs]",
"test_distributed_ddl/test.py::test_simple_alters[configs_secure]",
"test_distributed_ddl/test.py::test_socket_timeout[configs]",
"test_distributed_ddl/test.py::test_socket_timeout[configs_secure]",
"test_distributed_ddl/test_replicated_alter.py::test_replicated_alters[configs]",
"test_distributed_ddl/test_replicated_alter.py::test_replicated_alters[configs_secure]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-default-node1-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-default-node1-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-default-node2-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-default-node2-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-ready_to_wait-node1-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-ready_to_wait-node1-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-ready_to_wait-node2-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs-ready_to_wait-node2-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-default-node1-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-default-node1-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-default-node2-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-default-node2-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-ready_to_wait-node1-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-ready_to_wait-node1-remote]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-ready_to_wait-node2-distributed]",
"test_distributed_respect_user_timeouts/test.py::test_reconnect[configs_secure-ready_to_wait-node2-remote]",
"test_drop_replica/test.py::test_drop_replica",
"test_hedged_requests_parallel/test.py::test_combination1",
"test_hedged_requests_parallel/test.py::test_combination2",
"test_hedged_requests_parallel/test.py::test_query_with_no_data_to_sample",
"test_hedged_requests_parallel/test.py::test_send_data",
"test_hedged_requests_parallel/test.py::test_send_table_status_sleep",
"test_hedged_requests/test.py::test_combination1",
"test_hedged_requests/test.py::test_combination2",
"test_hedged_requests/test.py::test_combination3",
"test_hedged_requests/test.py::test_combination4",
"test_hedged_requests/test.py::test_long_query",
"test_hedged_requests/test.py::test_receive_timeout1",
"test_hedged_requests/test.py::test_receive_timeout2",
"test_hedged_requests/test.py::test_send_data",
"test_hedged_requests/test.py::test_send_data2",
"test_hedged_requests/test.py::test_send_table_status_sleep",
"test_hedged_requests/test.py::test_send_table_status_sleep2",
"test_hedged_requests/test.py::test_stuck_replica",
"test_https_replication/test.py::test_both_http",
"test_https_replication/test.py::test_both_https",
"test_https_replication/test.py::test_mixed_protocol",
"test_https_replication/test.py::test_replication_after_partition",
"test_insert_into_distributed_sync_async/test.py::test_async_inserts_into_local_shard",
"test_insert_into_distributed_sync_async/test.py::test_insertion_sync",
"test_insert_into_distributed_sync_async/test.py::test_insertion_sync_fails_with_timeout",
"test_insert_into_distributed_sync_async/test.py::test_insertion_sync_with_disabled_timeout",
"test_insert_into_distributed_sync_async/test.py::test_insertion_without_sync_ignores_timeout",
"test_insert_into_distributed/test.py::test_inserts_batching",
"test_insert_into_distributed/test.py::test_inserts_local",
"test_insert_into_distributed/test.py::test_inserts_low_cardinality",
"test_insert_into_distributed/test.py::test_inserts_single_replica_internal_replication",
"test_insert_into_distributed/test.py::test_inserts_single_replica_local_internal_replication",
"test_insert_into_distributed/test.py::test_inserts_single_replica_no_internal_replication",
"test_insert_into_distributed/test.py::test_prefer_localhost_replica",
"test_insert_into_distributed/test.py::test_reconnect",
"test_insert_into_distributed/test.py::test_table_function",
"test_insert_into_distributed_through_materialized_view/test.py::test_inserts_batching SKIPPED",
"test_insert_into_distributed_through_materialized_view/test.py::test_inserts_local",
"test_insert_into_distributed_through_materialized_view/test.py::test_reconnect",
"test_keeper_multinode_blocade_leader/test.py::test_blocade_leader",
"test_keeper_multinode_blocade_leader/test.py::test_blocade_leader_twice",
"test_keeper_multinode_simple/test.py::test_follower_restart",
"test_keeper_multinode_simple/test.py::test_read_write_multinode",
"test_keeper_multinode_simple/test.py::test_session_expiration",
"test_keeper_multinode_simple/test.py::test_simple_replicated_table",
"test_keeper_multinode_simple/test.py::test_watch_on_follower",
"test_limited_replicated_fetches/test.py::test_limited_fetches",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_multi_table_update[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_multi_table_update[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_settings[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_mysql_settings[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_network_partition_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_network_partition_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_network_partition_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_network_partition_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_select_without_columns_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_select_without_columns_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_select_without_columns_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_select_without_columns_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_system_parts_table[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_system_parts_table[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_system_tables_table[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_system_tables_table[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_materialize_with_enum[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_materialize_with_enum[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_utf8mb4[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_utf8mb4[clickhouse_node1]",
"test_parts_delete_zookeeper/test.py::test_merge_doesnt_work_without_zookeeper",
"test_polymorphic_parts/test.py::test_compact_parts_only",
"test_polymorphic_parts/test.py::test_different_part_types_on_replicas[polymorphic_table_compact-Compact]",
"test_polymorphic_parts/test.py::test_different_part_types_on_replicas[polymorphic_table_wide-Wide]",
"test_polymorphic_parts/test.py::test_in_memory",
"test_polymorphic_parts/test.py::test_in_memory_alters",
"test_polymorphic_parts/test.py::test_in_memory_deduplication",
"test_polymorphic_parts/test.py::test_in_memory_wal_rotate",
"test_polymorphic_parts/test.py::test_polymorphic_parts_basics[first_node0-second_node0]",
"test_polymorphic_parts/test.py::test_polymorphic_parts_basics[first_node1-second_node1]",
"test_polymorphic_parts/test.py::test_polymorphic_parts_diff_versions_2 SKIPPED",
"test_polymorphic_parts/test.py::test_polymorphic_parts_diff_versions SKIPPED",
"test_polymorphic_parts/test.py::test_polymorphic_parts_index",
"test_polymorphic_parts/test.py::test_polymorphic_parts_non_adaptive",
"test_quorum_inserts_parallel/test.py::test_parallel_quorum_actually_parallel",
"test_quorum_inserts_parallel/test.py::test_parallel_quorum_actually_quorum",
"test_random_inserts/test.py::test_insert_multithreaded",
"test_random_inserts/test.py::test_random_inserts",
"test_reload_clusters_config/test.py::test_add_cluster",
"test_reload_clusters_config/test.py::test_delete_cluster",
"test_reload_clusters_config/test.py::test_simple_reload",
"test_reload_clusters_config/test.py::test_update_one_cluster",
"test_replace_partition/test.py::test_drop_failover",
"test_replace_partition/test.py::test_normal_work",
"test_replace_partition/test.py::test_replace_after_replace_failover",
"test_replicated_database/test.py::test_alters_from_different_replicas",
"test_replicated_database/test.py::test_create_replicated_table",
"test_replicated_database/test.py::test_recover_staled_replica",
"test_replicated_database/test.py::test_simple_alter_table[MergeTree]",
"test_replicated_database/test.py::test_simple_alter_table[ReplicatedMergeTree]",
"test_replicated_database/test.py::test_startup_without_zk",
"test_replicated_fetches_timeouts/test.py::test_no_stall",
"test_storage_kafka/test.py::test_bad_reschedule",
"test_storage_kafka/test.py::test_commits_of_unprocessed_messages_on_drop",
"test_storage_kafka/test.py::test_exception_from_destructor",
"test_storage_kafka/test.py::test_kafka_commit_on_block_write",
"test_storage_kafka/test.py::test_kafka_consumer_hang",
"test_storage_kafka/test.py::test_kafka_consumer_hang2",
"test_storage_kafka/test.py::test_kafka_csv_with_delimiter",
"test_storage_kafka/test.py::test_kafka_csv_with_thread_per_consumer",
"test_storage_kafka/test.py::test_kafka_duplicates_when_commit_failed",
"test_storage_kafka/test.py::test_kafka_engine_put_errors_to_stream",
"test_storage_kafka/test.py::test_kafka_engine_put_errors_to_stream_with_random_malformed_json",
"test_storage_kafka/test.py::test_kafka_flush_by_block_size",
"test_storage_kafka/test.py::test_kafka_flush_by_time",
"test_storage_kafka/test.py::test_kafka_flush_on_big_message",
"test_storage_kafka/test.py::test_kafka_formats",
"test_storage_kafka/test.py::test_kafka_formats_with_broken_message",
"test_storage_kafka/test.py::test_kafka_insert",
"test_storage_kafka/test.py::test_kafka_issue11308",
"test_storage_kafka/test.py::test_kafka_issue14202",
"test_storage_kafka/test.py::test_kafka_issue4116",
"test_storage_kafka/test.py::test_kafka_json_as_string",
"test_storage_kafka/test.py::test_kafka_json_without_delimiter",
"test_storage_kafka/test.py::test_kafka_lot_of_partitions_partial_commit_of_bulk",
"test_storage_kafka/test.py::test_kafka_many_materialized_views",
"test_storage_kafka/test.py::test_kafka_materialized_view",
"test_storage_kafka/test.py::test_kafka_materialized_view_with_subquery",
"test_storage_kafka/test.py::test_kafka_no_holes_when_write_suffix_failed",
"test_storage_kafka/test.py::test_kafka_produce_consume",
"test_storage_kafka/test.py::test_kafka_produce_key_timestamp",
"test_storage_kafka/test.py::test_kafka_protobuf",
"test_storage_kafka/test.py::test_kafka_protobuf_no_delimiter",
"test_storage_kafka/test.py::test_kafka_rebalance",
"test_storage_kafka/test.py::test_kafka_select_empty",
"test_storage_kafka/test.py::test_kafka_settings_new_syntax",
"test_storage_kafka/test.py::test_kafka_settings_old_syntax",
"test_storage_kafka/test.py::test_kafka_string_field_on_first_position_in_protobuf",
"test_storage_kafka/test.py::test_kafka_tsv_with_delimiter",
"test_storage_kafka/test.py::test_kafka_unavailable",
"test_storage_kafka/test.py::test_kafka_virtual_columns",
"test_storage_kafka/test.py::test_kafka_virtual_columns2",
"test_storage_kafka/test.py::test_kafka_virtual_columns_with_materialized_view",
"test_storage_kafka/test.py::test_librdkafka_compression",
"test_storage_kafka/test.py::test_premature_flush_on_eof",
"test_storage_kerberized_kafka/test.py::test_kafka_json_as_string",
"test_storage_kerberized_kafka/test.py::test_kafka_json_as_string_no_kdc",
"test_system_clusters_actual_information/test.py::test",
"test_system_metrics/test.py::test_readonly_metrics",
"test_system_replicated_fetches/test.py::test_system_replicated_fetches"
]

View File

@ -51,5 +51,19 @@
"test_global_overcommit_tracker/test.py::test_global_overcommit",
"test_user_ip_restrictions/test.py::test_ipv4",
"test_user_ip_restrictions/test.py::test_ipv6"
"test_user_ip_restrictions/test.py::test_ipv6",
"test_server_reload/test.py::test_change_grpc_port",
"test_server_reload/test.py::test_change_http_handlers",
"test_server_reload/test.py::test_change_http_port",
"test_server_reload/test.py::test_change_listen_host",
"test_server_reload/test.py::test_change_mysql_port",
"test_server_reload/test.py::test_change_postgresql_port",
"test_server_reload/test.py::test_change_tcp_port",
"test_server_reload/test.py::test_reload_via_client",
"test_server_reload/test.py::test_remove_grpc_port",
"test_server_reload/test.py::test_remove_http_port",
"test_server_reload/test.py::test_remove_mysql_port",
"test_server_reload/test.py::test_remove_postgresql_port",
"test_server_reload/test.py::test_remove_tcp_port"
]

View File

@ -9,13 +9,14 @@ from helpers.test_tools import TSV, assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
num_nodes = 10
num_nodes = 4
ddl_task_timeout = 640
def generate_cluster_def():
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/cluster_for_concurrency_test.xml",
"./_gen/cluster_for_disallow_concurrency_test.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
@ -85,7 +86,7 @@ def drop_after_test():
node0.query(
"DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": 360,
"distributed_ddl_task_timeout": ddl_task_timeout,
},
)
@ -100,6 +101,7 @@ def new_backup_name():
def create_and_fill_table():
node0.query("SET mutations_sync=2")
node0.query(
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
"x UInt64"
@ -107,7 +109,10 @@ def create_and_fill_table():
"ORDER BY x"
)
for i in range(num_nodes):
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(40000000)")
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(100000000)")
nodes[i].query(
f"INSERT INTO tbl SELECT number+100000000 FROM numbers(100000000)"
)
# All the tests have concurrent backup/restores with same backup names
@ -138,6 +143,8 @@ def test_concurrent_backups_on_same_node():
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
# This restore part is added to confirm creating an internal backup & restore work
@ -145,7 +152,7 @@ def test_concurrent_backups_on_same_node():
nodes[0].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": 360,
"distributed_ddl_task_timeout": ddl_task_timeout,
},
)
nodes[0].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
@ -174,6 +181,8 @@ def test_concurrent_backups_on_different_nodes():
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
@ -197,12 +206,14 @@ def test_concurrent_restores_on_same_node():
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
nodes[0].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": 360,
"distributed_ddl_task_timeout": ddl_task_timeout,
},
)
restore_id = (
@ -226,44 +237,46 @@ def test_concurrent_restores_on_different_node():
backup_name = new_backup_name()
id = (
nodes[0]
nodes[1]
.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC")
.split("\t")[0]
)
assert_eq_with_retry(
nodes[0],
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'",
"CREATING_BACKUP",
)
assert_eq_with_retry(
nodes[0],
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
nodes[0].query(
nodes[1].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": 360,
"distributed_ddl_task_timeout": ddl_task_timeout,
},
)
restore_id = (
nodes[0]
nodes[1]
.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name} ASYNC")
.split("\t")[0]
)
assert_eq_with_retry(
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'RESTORING'",
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'RESTORING' AND id == '{restore_id}'",
"RESTORING",
)
assert "Concurrent restores not supported" in nodes[1].query_and_get_error(
assert "Concurrent restores not supported" in nodes[0].query_and_get_error(
f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}"
)
assert_eq_with_retry(
nodes[0],
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'RESTORED' AND id == '{restore_id}'",
"RESTORED",
)

View File

@ -85,6 +85,7 @@ def test_create_and_drop():
def test_create_and_replace():
node1.query("CREATE FUNCTION f1 AS (x, y) -> x + y")
assert node1.query("SELECT f1(12, 3)") == "15\n"
expected_error = "User-defined function 'f1' already exists"
assert expected_error in node1.query_and_get_error(
@ -253,3 +254,27 @@ def test_reload_zookeeper():
# switch to the original version of zookeeper config
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
revert_zookeeper_config()
# Start without ZooKeeper must be possible, user-defined functions will be loaded after connecting to ZooKeeper.
def test_start_without_zookeeper():
node2.stop_clickhouse()
node1.query("CREATE FUNCTION f1 AS (x, y) -> x + y")
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
node2.start_clickhouse()
assert (
node2.query("SELECT create_query FROM system.functions WHERE name='f1'") == ""
)
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo1", "zoo2", "zoo3"])
assert_eq_with_retry(
node2,
"SELECT create_query FROM system.functions WHERE name='f1'",
"CREATE FUNCTION f1 AS (x, y) -> (x + y)\n",
)
node1.query("DROP FUNCTION f1")

View File

@ -1,3 +1,4 @@
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
@ -412,3 +413,74 @@ def test_function_current_roles():
)
== "['R1']\t['R1']\t['R1']\n"
)
def test_role_expiration():
instance.query("CREATE USER ure")
instance.query("CREATE ROLE rre")
instance.query("GRANT rre TO ure")
instance.query("CREATE TABLE IF NOT EXISTS tre (id Int) Engine=Log")
instance.query("INSERT INTO tre VALUES (0)")
assert "Not enough privileges" in instance.query_and_get_error(
"SELECT * FROM tre", user="ure"
)
instance.query("GRANT SELECT ON tre TO rre")
assert instance.query("SELECT * FROM tre", user="ure") == "0\n"
# access_control_improvements/role_cache_expiration_time_seconds value is 2 for the test
# so we wait >2 seconds until the role is expired
time.sleep(5)
instance.query("CREATE TABLE IF NOT EXISTS tre1 (id Int) Engine=Log")
instance.query("INSERT INTO tre1 VALUES (0)")
instance.query("GRANT SELECT ON tre1 TO rre")
assert instance.query("SELECT * from tre1", user="ure") == "0\n"
instance.query("DROP USER ure")
instance.query("DROP ROLE rre")
instance.query("DROP TABLE tre")
instance.query("DROP TABLE tre1")
def test_two_roles_expiration():
instance.query("CREATE USER ure")
instance.query("CREATE ROLE rre")
instance.query("GRANT rre TO ure")
instance.query("CREATE ROLE rre_second")
instance.query("CREATE TABLE IF NOT EXISTS tre (id Int) Engine=Log")
instance.query("INSERT INTO tre VALUES (0)")
assert "Not enough privileges" in instance.query_and_get_error(
"SELECT * FROM tre", user="ure"
)
instance.query("GRANT SELECT ON tre TO rre")
assert instance.query("SELECT * FROM tre", user="ure") == "0\n"
# access_control_improvements/role_cache_expiration_time_seconds value is 2 for the test
# so we wait >2 seconds until the roles are expired
time.sleep(5)
instance.query(
"GRANT SELECT ON tre1 TO rre_second"
) # we expect that both rre and rre_second are gone from cache upon this operation
instance.query("CREATE TABLE IF NOT EXISTS tre1 (id Int) Engine=Log")
instance.query("INSERT INTO tre1 VALUES (0)")
instance.query("GRANT SELECT ON tre1 TO rre")
assert instance.query("SELECT * from tre1", user="ure") == "0\n"
instance.query("DROP USER ure")
instance.query("DROP ROLE rre")
instance.query("DROP ROLE rre_second")
instance.query("DROP TABLE tre")
instance.query("DROP TABLE tre1")

View File

@ -25,5 +25,5 @@
9631199822919835226
4334672815104069193
4334672815104069193
6145F501578671E2877DBA2BE487AF7E
16FE7483905CCE7A85670E43E4678877
1
1

View File

@ -32,5 +32,7 @@ SELECT gccMurmurHash('foo');
SELECT gccMurmurHash('\x01');
SELECT gccMurmurHash(1);
SELECT hex(murmurHash3_128('foo'));
SELECT hex(murmurHash3_128('\x01'));
-- Comparison with reverse for big endian
SELECT hex(murmurHash3_128('foo')) = hex(reverse(unhex('6145F501578671E2877DBA2BE487AF7E'))) or hex(murmurHash3_128('foo')) = '6145F501578671E2877DBA2BE487AF7E';
-- Comparison with reverse for big endian
SELECT hex(murmurHash3_128('\x01')) = hex(reverse(unhex('16FE7483905CCE7A85670E43E4678877'))) or hex(murmurHash3_128('\x01')) = '16FE7483905CCE7A85670E43E4678877';

View File

@ -1,5 +1,6 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-fasttest
# Tags: no-random-settings, no-fasttest, no-parallel
# For unknown reason this test is flaky without no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -4,4 +4,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --allow_experimental_parallel_reading_from_replicas=1 -nm < "$CURDIR"/01099_parallel_distributed_insert_select.sql > /dev/null
${CLICKHOUSE_CLIENT} --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 -nm < "$CURDIR"/01099_parallel_distributed_insert_select.sql > /dev/null

View File

@ -15,6 +15,7 @@ as select * from numbers(1);
# Logical error: 'Coordinator for parallel reading from replicas is not initialized'.
opts=(
--allow_experimental_parallel_reading_from_replicas 1
--parallel_replicas_for_non_replicated_merge_tree 1
--max_parallel_replicas 3
--iterations 1

View File

@ -61,6 +61,7 @@ create table pr_t(a UInt64, b UInt64) engine=MergeTree order by a;
insert into pr_t select number % 1000, number % 1000 from numbers_mt(1e6);
set allow_experimental_parallel_reading_from_replicas = 1;
set parallel_replicas_for_non_replicated_merge_tree = 1;
set max_parallel_replicas = 3;
set use_hedged_requests = 0;
set cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';

View File

@ -0,0 +1 @@
GRANT SELECT ON *.* TO A WITH REPLACE OPTION

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
format="$CLICKHOUSE_FORMAT --oneline"
echo "grant select on *.* to A with replace option" | $format

View File

@ -1 +1,2 @@
22 0 1
1970-01-01 02:00:02

View File

@ -1,4 +1,5 @@
DROP TABLE IF EXISTS 02680_datetime64_monotonic_check;
DROP TABLE IF EXISTS 02680_datetime_monotonic_check_lc;
CREATE TABLE 02680_datetime64_monotonic_check (`t` DateTime64(3), `x` Nullable(Decimal(18, 14)))
ENGINE = MergeTree
@ -13,3 +14,15 @@ WHERE toHour_Israel = 0
GROUP BY toHour_UTC, toHour_Israel;
DROP TABLE 02680_datetime64_monotonic_check;
SET allow_suspicious_low_cardinality_types = 1;
CREATE TABLE 02680_datetime_monotonic_check_lc (`timestamp` LowCardinality(UInt32))
ENGINE = MergeTree
ORDER BY timestamp
SETTINGS index_granularity = 1;
INSERT INTO 02680_datetime_monotonic_check_lc VALUES (2);
SELECT toDateTime(timestamp, 'Asia/Jerusalem') FROM 02680_datetime_monotonic_check_lc WHERE toHour(toDateTime(timestamp, 'Asia/Jerusalem')) = 2;
DROP TABLE 02680_datetime_monotonic_check_lc

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS 02691_drop_column_replicated;
CREATE TABLE 02691_drop_column_replicated (col1 Int64, col2 Int64, PROJECTION 02691_drop_column_replicated (SELECT * ORDER BY col1 ))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/02691_drop_column', 'r1')
ORDER BY col1;
INSERT INTO 02691_drop_column_replicated VALUES (1, 2);
ALTER TABLE 02691_drop_column_replicated DROP COLUMN col2 SETTINGS alter_sync = 2;
DROP TABLE 02691_drop_column_replicated;

View File

@ -0,0 +1,49 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t3;
CREATE TABLE t1 (`1a` Nullable(Int64), `2b` Nullable(String)) engine = Memory;
CREATE TABLE t2 (`3c` Nullable(Int64), `4d` Nullable(String)) engine = Memory;
CREATE TABLE t3 (`5e` Nullable(Int64), `6f` Nullable(String)) engine = Memory;
SELECT
`1a`,
`2b`
FROM t1 AS tt1
INNER JOIN
(
SELECT `3c`
FROM t2
) AS tt2 ON tt1.`1a` = tt2.`3c`
INNER JOIN
(
SELECT `6f`
FROM t3
) AS tt3 ON tt1.`2b` = tt3.`6f`;
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;
CREATE TABLE t1 (`a` Nullable(Int64), `b` Nullable(String)) engine = Memory;
CREATE TABLE t2 (`c` Nullable(Int64), `d` Nullable(String)) engine = Memory;
CREATE TABLE t3 (`e` Nullable(Int64), `f` Nullable(String)) engine = Memory;
SELECT
a,
b
FROM t1 AS tt1
INNER JOIN
(
SELECT c
FROM t2
) AS tt2 ON tt1.a = tt2.c
INNER JOIN
(
SELECT f
FROM t3
) AS tt3 ON tt1.b = tt3.f;
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;

View File

@ -0,0 +1,2 @@
2021-01-01 上海市 启用
2021-01-02 北京市 停用

View File

@ -0,0 +1,24 @@
DROP TABLE IF EXISTS store;
DROP TABLE IF EXISTS location;
DROP TABLE IF EXISTS sales;
CREATE TABLE store (id UInt32, "名称" String, "状态" String) ENGINE=MergeTree() Order by id;
CREATE TABLE location (id UInt32, name String) ENGINE=MergeTree() Order by id;
CREATE TABLE sales ("日期" Date, "店铺" UInt32, "地址" UInt32, "销售额" Float32) ENGINE=MergeTree() Order by "日期";
INSERT INTO store VALUES (1,'店铺1','启用'),(2,'店铺2','停用');
INSERT INTO location VALUES (1,'上海市'),(2,'北京市');
INSERT INTO sales VALUES ('2021-01-01',1,1,10),('2021-01-02',2,2,20);
SELECT
``,
location.name,
store.``
FROM sales
LEFT JOIN store ON store.id = ``
LEFT JOIN location ON location.id = ``
ORDER BY 1, 2, 3;
DROP TABLE store;
DROP TABLE location;
DROP TABLE sales;

View File

@ -0,0 +1,3 @@
create temporary table temp_table3(val0 UInt64) ENGINE=Memory();
select * from (select 1 as id) t1 inner join (select 1 as id) t2 on t1.id=t2.id inner join (select 1 as id) t3 on t1.id=t3.id where t1.id in temp_table3;
select * from (select 1 as id) t1 inner join (select 1 as id) t2 on t1.id=t2.id where t1.id in temp_table3;

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -n --query="SELECT sum(number * 0) FROM numbers(10000000000) SETTINGS stop_reading_on_first_cancel=true;" &
pid=$!
sleep 2
kill -SIGINT $pid
wait $pid

View File

@ -34,7 +34,7 @@ test1() {
GROUP BY CounterID, URL, EventDate
ORDER BY URL, EventDate
LIMIT 5 OFFSET 10
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, use_hedged_requests = 0"
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3, use_hedged_requests = 0"
check_replicas_read_in_order $query_id
}
@ -51,7 +51,7 @@ test2() {
GROUP BY URL, EventDate
ORDER BY URL, EventDate
LIMIT 5 OFFSET 10
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, use_hedged_requests = 0, query_plan_aggregation_in_order = 1"
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3, use_hedged_requests = 0, query_plan_aggregation_in_order = 1"
check_replicas_read_in_order $query_id
}
@ -67,7 +67,7 @@ test3() {
FROM test.hits
WHERE CounterID = 1704509 AND UserID = 4322253409885123546
GROUP BY URL, EventDate
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, use_hedged_requests = 0
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3, use_hedged_requests = 0
)
WHERE explain LIKE '%Aggr%Transform%' OR explain LIKE '%InOrder%'"
}