mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Compare commits
37 Commits
343edf0c33
...
47c6132d4e
Author | SHA1 | Date | |
---|---|---|---|
|
47c6132d4e | ||
|
d793e06860 | ||
|
1986fb1418 | ||
|
f36408a666 | ||
|
de85f5f251 | ||
|
85af661b9c | ||
|
b42c6491e4 | ||
|
1a4c7b7c61 | ||
|
14feba8443 | ||
|
4c4a051d5e | ||
|
a55cc03973 | ||
|
37411bf240 | ||
|
9102a6f119 | ||
|
a461d20af9 | ||
|
59d1f9a6b1 | ||
|
b55d0b54ea | ||
|
418ef3f8bc | ||
|
b420bbf855 | ||
|
6a7cfd13f7 | ||
|
baf6aaef1d | ||
|
9ca149a487 | ||
|
042194e3f6 | ||
|
adb905a692 | ||
|
e6ec9eaad3 | ||
|
9a3adc70bd | ||
|
120e38c72a | ||
|
38b5ea9066 | ||
|
fe5e061fff | ||
|
f6b965872f | ||
|
22c3b71196 | ||
|
7425d4aa1a | ||
|
cf12e3924f | ||
|
cfc931160d | ||
|
b2c4b771d8 | ||
|
edf4e09fb2 | ||
|
07f44fdb89 | ||
|
2fcbe2465a |
34
.github/actions/debug/action.yml
vendored
34
.github/actions/debug/action.yml
vendored
@ -4,15 +4,31 @@ description: Prints workflow debug info
|
||||
runs:
|
||||
using: "composite"
|
||||
steps:
|
||||
- name: Print envs
|
||||
- name: Envs, event.json and contexts
|
||||
shell: bash
|
||||
run: |
|
||||
echo "::group::Envs"
|
||||
env
|
||||
echo "::endgroup::"
|
||||
- name: Print Event.json
|
||||
shell: bash
|
||||
run: |
|
||||
echo "::group::Event.json"
|
||||
echo '::group::Environment variables'
|
||||
env | sort
|
||||
echo '::endgroup::'
|
||||
|
||||
echo '::group::event.json'
|
||||
python3 -m json.tool "$GITHUB_EVENT_PATH"
|
||||
echo "::endgroup::"
|
||||
echo '::endgroup::'
|
||||
|
||||
cat << 'EOF'
|
||||
::group::github context
|
||||
${{ toJSON(github) }}
|
||||
::endgroup::
|
||||
|
||||
::group::env context
|
||||
${{ toJSON(env) }}
|
||||
::endgroup::
|
||||
|
||||
::group::runner context
|
||||
${{ toJSON(runner) }}
|
||||
::endgroup::
|
||||
|
||||
::group::job context
|
||||
${{ toJSON(job) }}
|
||||
::endgroup::
|
||||
EOF
|
||||
|
2
.github/workflows/backport_branches.yml
vendored
2
.github/workflows/backport_branches.yml
vendored
@ -27,6 +27,8 @@ jobs:
|
||||
clear-repository: true # to ensure correct digests
|
||||
fetch-depth: 0 # to get version
|
||||
filter: tree:0
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: Labels check
|
||||
run: |
|
||||
cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
|
2
.github/workflows/cherry_pick.yml
vendored
2
.github/workflows/cherry_pick.yml
vendored
@ -33,6 +33,8 @@ jobs:
|
||||
clear-repository: true
|
||||
token: ${{secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN}}
|
||||
fetch-depth: 0
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: Cherry pick
|
||||
run: |
|
||||
cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
|
4
.github/workflows/create_release.yml
vendored
4
.github/workflows/create_release.yml
vendored
@ -56,13 +56,13 @@ jobs:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }}
|
||||
runs-on: [self-hosted, release-maker]
|
||||
steps:
|
||||
- name: DebugInfo
|
||||
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
token: ${{secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN}}
|
||||
fetch-depth: 0
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: Prepare Release Info
|
||||
shell: bash
|
||||
run: |
|
||||
|
1
.github/workflows/docker_test_images.yml
vendored
1
.github/workflows/docker_test_images.yml
vendored
@ -11,6 +11,7 @@ name: Build docker images
|
||||
required: false
|
||||
type: boolean
|
||||
default: false
|
||||
|
||||
jobs:
|
||||
DockerBuildAarch64:
|
||||
runs-on: [self-hosted, style-checker-aarch64]
|
||||
|
7
.github/workflows/jepsen.yml
vendored
7
.github/workflows/jepsen.yml
vendored
@ -8,27 +8,28 @@ on: # yamllint disable-line rule:truthy
|
||||
schedule:
|
||||
- cron: '0 */6 * * *'
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
RunConfig:
|
||||
runs-on: [self-hosted, style-checker-aarch64]
|
||||
outputs:
|
||||
data: ${{ steps.runconfig.outputs.CI_DATA }}
|
||||
steps:
|
||||
- name: DebugInfo
|
||||
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
clear-repository: true # to ensure correct digests
|
||||
fetch-depth: 0 # to get version
|
||||
filter: tree:0
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: PrepareRunConfig
|
||||
id: runconfig
|
||||
run: |
|
||||
echo "::group::configure CI run"
|
||||
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --configure --workflow "$GITHUB_WORKFLOW" --outfile ${{ runner.temp }}/ci_run_data.json
|
||||
echo "::endgroup::"
|
||||
|
||||
|
||||
echo "::group::CI run configure results"
|
||||
python3 -m json.tool ${{ runner.temp }}/ci_run_data.json
|
||||
echo "::endgroup::"
|
||||
|
4
.github/workflows/master.yml
vendored
4
.github/workflows/master.yml
vendored
@ -15,14 +15,14 @@ jobs:
|
||||
outputs:
|
||||
data: ${{ steps.runconfig.outputs.CI_DATA }}
|
||||
steps:
|
||||
- name: DebugInfo
|
||||
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
clear-repository: true # to ensure correct digests
|
||||
fetch-depth: 0 # to get version
|
||||
filter: tree:0
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: Merge sync PR
|
||||
run: |
|
||||
cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
|
4
.github/workflows/merge_queue.yml
vendored
4
.github/workflows/merge_queue.yml
vendored
@ -14,14 +14,14 @@ jobs:
|
||||
outputs:
|
||||
data: ${{ steps.runconfig.outputs.CI_DATA }}
|
||||
steps:
|
||||
- name: DebugInfo
|
||||
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
clear-repository: true # to ensure correct digests
|
||||
fetch-depth: 0 # to get a version
|
||||
filter: tree:0
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: Cancel PR workflow
|
||||
run: |
|
||||
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --cancel-previous-run
|
||||
|
4
.github/workflows/nightly.yml
vendored
4
.github/workflows/nightly.yml
vendored
@ -15,14 +15,14 @@ jobs:
|
||||
outputs:
|
||||
data: ${{ steps.runconfig.outputs.CI_DATA }}
|
||||
steps:
|
||||
- name: DebugInfo
|
||||
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
clear-repository: true # to ensure correct digests
|
||||
fetch-depth: 0 # to get version
|
||||
filter: tree:0
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: PrepareRunConfig
|
||||
id: runconfig
|
||||
run: |
|
||||
|
4
.github/workflows/pull_request.yml
vendored
4
.github/workflows/pull_request.yml
vendored
@ -25,14 +25,14 @@ jobs:
|
||||
outputs:
|
||||
data: ${{ steps.runconfig.outputs.CI_DATA }}
|
||||
steps:
|
||||
- name: DebugInfo
|
||||
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
clear-repository: true # to ensure correct digests
|
||||
fetch-depth: 0 # to get a version
|
||||
filter: tree:0
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: Cancel previous Sync PR workflow
|
||||
run: |
|
||||
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --cancel-previous-run
|
||||
|
2
.github/workflows/release_branches.yml
vendored
2
.github/workflows/release_branches.yml
vendored
@ -24,6 +24,8 @@ jobs:
|
||||
clear-repository: true # to ensure correct digests
|
||||
fetch-depth: 0 # to get version
|
||||
filter: tree:0
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: Labels check
|
||||
run: |
|
||||
cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
|
4
.github/workflows/reusable_simple_job.yml
vendored
4
.github/workflows/reusable_simple_job.yml
vendored
@ -62,8 +62,6 @@ jobs:
|
||||
env:
|
||||
GITHUB_JOB_OVERRIDDEN: ${{inputs.test_name}}
|
||||
steps:
|
||||
- name: DebugInfo
|
||||
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
@ -72,6 +70,8 @@ jobs:
|
||||
submodules: ${{inputs.submodules}}
|
||||
fetch-depth: ${{inputs.checkout_depth}}
|
||||
filter: tree:0
|
||||
- name: Debug Info
|
||||
uses: ./.github/actions/debug
|
||||
- name: Set build envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
|
@ -13,16 +13,17 @@ Here is a complete list of available database engines. Follow the links for more
|
||||
|
||||
- [Atomic](../../engines/database-engines/atomic.md)
|
||||
|
||||
- [MySQL](../../engines/database-engines/mysql.md)
|
||||
- [Lazy](../../engines/database-engines/lazy.md)
|
||||
|
||||
- [MaterializedPostgreSQL](../../engines/database-engines/materialized-postgresql.md)
|
||||
|
||||
- [MaterializedMySQL](../../engines/database-engines/materialized-mysql.md)
|
||||
|
||||
- [Lazy](../../engines/database-engines/lazy.md)
|
||||
- [MySQL](../../engines/database-engines/mysql.md)
|
||||
|
||||
- [PostgreSQL](../../engines/database-engines/postgresql.md)
|
||||
|
||||
- [MaterializedPostgreSQL](../../engines/database-engines/materialized-postgresql.md)
|
||||
|
||||
- [Replicated](../../engines/database-engines/replicated.md)
|
||||
|
||||
- [SQLite](../../engines/database-engines/sqlite.md)
|
||||
|
||||
|
@ -107,6 +107,10 @@ The vector similarity index currently does not work with per-table, non-default
|
||||
[here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml.
|
||||
:::
|
||||
|
||||
Vector index creation is known to be slow. To speed the process up, index creation can be parallelized. The maximum number of threads can be
|
||||
configured using server configuration
|
||||
setting [max_build_vector_similarity_index_thread_pool_size](../../../operations/server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size).
|
||||
|
||||
ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary
|
||||
tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write
|
||||
requests.
|
||||
|
@ -491,6 +491,14 @@ Type: Double
|
||||
|
||||
Default: 0.9
|
||||
|
||||
## max_build_vector_similarity_index_thread_pool_size {#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size}
|
||||
|
||||
The maximum number of threads to use for building vector indexes. 0 means all cores.
|
||||
|
||||
Type: UInt64
|
||||
|
||||
Default: 16
|
||||
|
||||
## cgroups_memory_usage_observer_wait_time
|
||||
|
||||
Interval in seconds during which the server's maximum allowed memory consumption is adjusted by the corresponding threshold in cgroups. (see
|
||||
|
@ -178,6 +178,9 @@
|
||||
M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \
|
||||
M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \
|
||||
M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \
|
||||
M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \
|
||||
M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \
|
||||
M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \
|
||||
\
|
||||
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
|
||||
M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \
|
||||
|
@ -63,6 +63,7 @@ static struct InitFiu
|
||||
REGULAR(keepermap_fail_drop_data) \
|
||||
REGULAR(lazy_pipe_fds_fail_close) \
|
||||
PAUSEABLE(infinite_sleep) \
|
||||
PAUSEABLE(stop_moving_part_before_swap_with_active) \
|
||||
|
||||
|
||||
namespace FailPoints
|
||||
|
@ -1723,11 +1723,10 @@ std::string normalizeZooKeeperPath(std::string zookeeper_path, bool check_starts
|
||||
|
||||
String extractZooKeeperName(const String & path)
|
||||
{
|
||||
static constexpr auto default_zookeeper_name = "default";
|
||||
if (path.empty())
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path should not be empty");
|
||||
if (path[0] == '/')
|
||||
return default_zookeeper_name;
|
||||
return String(DEFAULT_ZOOKEEPER_NAME);
|
||||
auto pos = path.find(":/");
|
||||
if (pos != String::npos && pos < path.find('/'))
|
||||
{
|
||||
@ -1736,7 +1735,7 @@ String extractZooKeeperName(const String & path)
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Zookeeper path should start with '/' or '<auxiliary_zookeeper_name>:/'");
|
||||
return zookeeper_name;
|
||||
}
|
||||
return default_zookeeper_name;
|
||||
return String(DEFAULT_ZOOKEEPER_NAME);
|
||||
}
|
||||
|
||||
String extractZooKeeperPath(const String & path, bool check_starts_with_slash, LoggerPtr log)
|
||||
|
@ -47,6 +47,10 @@ namespace zkutil
|
||||
/// Preferred size of multi command (in the number of operations)
|
||||
constexpr size_t MULTI_BATCH_SIZE = 100;
|
||||
|
||||
/// Path "default:/foo" refers to znode "/foo" in the default zookeeper,
|
||||
/// path "other:/foo" refers to znode "/foo" in auxiliary zookeeper named "other".
|
||||
constexpr std::string_view DEFAULT_ZOOKEEPER_NAME = "default";
|
||||
|
||||
struct ShuffleHost
|
||||
{
|
||||
enum AvailabilityZoneInfo
|
||||
|
@ -50,7 +50,7 @@ namespace DB
|
||||
M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \
|
||||
M(String, default_database, "default", "Default database name.", 0) \
|
||||
M(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \
|
||||
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \
|
||||
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting.", 0) \
|
||||
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
|
||||
M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \
|
||||
M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \
|
||||
@ -65,6 +65,7 @@ namespace DB
|
||||
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
|
||||
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
|
||||
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \
|
||||
M(UInt64, max_build_vector_similarity_index_thread_pool_size, 16, "The maximum number of threads to use to build vector similarity indexes. 0 means all cores.", 0) \
|
||||
\
|
||||
/* Database Catalog */ \
|
||||
M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \
|
||||
|
@ -50,13 +50,6 @@ private:
|
||||
return executeNonconstant(input);
|
||||
}
|
||||
|
||||
[[maybe_unused]] String toString() const
|
||||
{
|
||||
WriteBufferFromOwnString buf;
|
||||
buf << "format:" << format << ", rows:" << rows << ", is_literal:" << is_literal << ", input:" << input.dumpStructure() << "\n";
|
||||
return buf.str();
|
||||
}
|
||||
|
||||
private:
|
||||
ColumnWithTypeAndName executeLiteral(std::string_view literal) const
|
||||
{
|
||||
@ -231,9 +224,7 @@ public:
|
||||
const auto & instruction = instructions[i];
|
||||
try
|
||||
{
|
||||
// std::cout << "instruction[" << i << "]:" << instructions[i].toString() << std::endl;
|
||||
concat_args[i] = instruction.execute();
|
||||
// std::cout << "concat_args[" << i << "]:" << concat_args[i].dumpStructure() << std::endl;
|
||||
}
|
||||
catch (const fmt::v9::format_error & e)
|
||||
{
|
||||
@ -358,7 +349,14 @@ private:
|
||||
|
||||
REGISTER_FUNCTION(Printf)
|
||||
{
|
||||
factory.registerFunction<FunctionPrintf>();
|
||||
factory.registerFunction<FunctionPrintf>(
|
||||
FunctionDocumentation{.description=R"(
|
||||
The `printf` function formats the given string with the values (strings, integers, floating-points etc.) listed in the arguments, similar to printf function in C++.
|
||||
The format string can contain format specifiers starting with `%` character.
|
||||
Anything not contained in `%` and the following format specifier is considered literal text and copied verbatim into the output.
|
||||
Literal `%` character can be escaped by `%%`.)", .examples{{"sum", "select printf('%%%s %s %d', 'Hello', 'World', 2024);", "%Hello World 2024"}}, .categories{"String"}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <Common/SensitiveDataMasker.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <Common/getNumberOfPhysicalCPUCores.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Common/Throttler.h>
|
||||
@ -121,7 +122,6 @@
|
||||
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
||||
#include <base/defines.h>
|
||||
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -164,6 +164,9 @@ namespace CurrentMetrics
|
||||
extern const Metric TablesLoaderForegroundThreadsActive;
|
||||
extern const Metric TablesLoaderForegroundThreadsScheduled;
|
||||
extern const Metric IOWriterThreadsScheduled;
|
||||
extern const Metric BuildVectorSimilarityIndexThreads;
|
||||
extern const Metric BuildVectorSimilarityIndexThreadsActive;
|
||||
extern const Metric BuildVectorSimilarityIndexThreadsScheduled;
|
||||
extern const Metric AttachedTable;
|
||||
extern const Metric AttachedView;
|
||||
extern const Metric AttachedDictionary;
|
||||
@ -297,6 +300,8 @@ struct ContextSharedPart : boost::noncopyable
|
||||
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
|
||||
mutable OnceFlag prefetch_threadpool_initialized;
|
||||
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
|
||||
mutable OnceFlag build_vector_similarity_index_threadpool_initialized;
|
||||
mutable std::unique_ptr<ThreadPool> build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation.
|
||||
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
|
||||
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
|
||||
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
|
||||
@ -3297,6 +3302,21 @@ size_t Context::getPrefetchThreadpoolSize() const
|
||||
return config.getUInt(".prefetch_threadpool_pool_size", 100);
|
||||
}
|
||||
|
||||
ThreadPool & Context::getBuildVectorSimilarityIndexThreadPool() const
|
||||
{
|
||||
callOnce(shared->build_vector_similarity_index_threadpool_initialized, [&] {
|
||||
size_t pool_size = shared->server_settings.max_build_vector_similarity_index_thread_pool_size > 0
|
||||
? shared->server_settings.max_build_vector_similarity_index_thread_pool_size
|
||||
: getNumberOfPhysicalCPUCores();
|
||||
shared->build_vector_similarity_index_threadpool = std::make_unique<ThreadPool>(
|
||||
CurrentMetrics::BuildVectorSimilarityIndexThreads,
|
||||
CurrentMetrics::BuildVectorSimilarityIndexThreadsActive,
|
||||
CurrentMetrics::BuildVectorSimilarityIndexThreadsScheduled,
|
||||
pool_size);
|
||||
});
|
||||
return *shared->build_vector_similarity_index_threadpool;
|
||||
}
|
||||
|
||||
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
|
||||
{
|
||||
callOnce(shared->buffer_flush_schedule_pool_initialized, [&] {
|
||||
@ -3743,6 +3763,11 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
|
||||
return zookeeper->second;
|
||||
}
|
||||
|
||||
std::shared_ptr<zkutil::ZooKeeper> Context::getDefaultOrAuxiliaryZooKeeper(const String & name) const
|
||||
{
|
||||
return name == zkutil::DEFAULT_ZOOKEEPER_NAME ? getZooKeeper() : getAuxiliaryZooKeeper(name);
|
||||
}
|
||||
|
||||
|
||||
std::map<String, zkutil::ZooKeeperPtr> Context::getAuxiliaryZooKeepers() const
|
||||
{
|
||||
|
@ -1004,6 +1004,8 @@ public:
|
||||
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
|
||||
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
|
||||
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
|
||||
/// If name == "default", same as getZooKeeper(), otherwise same as getAuxiliaryZooKeeper().
|
||||
std::shared_ptr<zkutil::ZooKeeper> getDefaultOrAuxiliaryZooKeeper(const String & name) const;
|
||||
/// return Auxiliary Zookeeper map
|
||||
std::map<String, zkutil::ZooKeeperPtr> getAuxiliaryZooKeepers() const;
|
||||
|
||||
@ -1097,6 +1099,8 @@ public:
|
||||
/// and make a prefetch by putting a read task to threadpoolReader.
|
||||
size_t getPrefetchThreadpoolSize() const;
|
||||
|
||||
ThreadPool & getBuildVectorSimilarityIndexThreadPool() const;
|
||||
|
||||
/// Settings for MergeTree background tasks stored in config.xml
|
||||
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
|
||||
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;
|
||||
|
@ -1469,7 +1469,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
{
|
||||
if (!getContext()->getSettingsRef().allow_experimental_refreshable_materialized_view)
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"Refreshable materialized views are experimental. Enable allow_experimental_refreshable_materialized_view to use.");
|
||||
"Refreshable materialized views are experimental. Enable allow_experimental_refreshable_materialized_view to use");
|
||||
|
||||
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
|
||||
visitor.visit(*create.refresh_strategy);
|
||||
|
@ -1005,7 +1005,7 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
|
||||
{
|
||||
ReplicatedTableStatus status;
|
||||
storage_replicated->getStatus(status);
|
||||
if (status.zookeeper_path == query.replica_zk_path)
|
||||
if (status.zookeeper_info.path == query.replica_zk_path)
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED,
|
||||
"There is a local table {}, which has the same table path in ZooKeeper. "
|
||||
"Please check the path in query. "
|
||||
@ -1028,7 +1028,10 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
|
||||
if (zookeeper->exists(remote_replica_path + "/is_active"))
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't remove replica: {}, because it's active", query.replica);
|
||||
|
||||
StorageReplicatedMergeTree::dropReplica(zookeeper, query.replica_zk_path, query.replica, log);
|
||||
TableZnodeInfo info;
|
||||
info.path = query.replica_zk_path;
|
||||
info.replica_name = query.replica;
|
||||
StorageReplicatedMergeTree::dropReplica(zookeeper, info, log);
|
||||
LOG_INFO(log, "Dropped replica {}", remote_replica_path);
|
||||
}
|
||||
else
|
||||
@ -1045,12 +1048,12 @@ bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const Stora
|
||||
storage_replicated->getStatus(status);
|
||||
|
||||
/// Do not allow to drop local replicas and active remote replicas
|
||||
if (query.replica == status.replica_name)
|
||||
if (query.replica == status.zookeeper_info.replica_name)
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED,
|
||||
"We can't drop local replica, please use `DROP TABLE` if you want "
|
||||
"to clean the data and drop this replica");
|
||||
|
||||
storage_replicated->dropReplica(status.zookeeper_path, query.replica, log);
|
||||
storage_replicated->dropReplica(query.replica, log);
|
||||
LOG_TRACE(log, "Dropped replica {} of {}", query.replica, table->getStorageID().getNameForLogs());
|
||||
|
||||
return true;
|
||||
|
@ -74,7 +74,8 @@ private:
|
||||
findMySQLFunctionSecretArguments();
|
||||
}
|
||||
else if ((function.name == "s3") || (function.name == "cosn") || (function.name == "oss") ||
|
||||
(function.name == "deltaLake") || (function.name == "hudi") || (function.name == "iceberg"))
|
||||
(function.name == "deltaLake") || (function.name == "hudi") || (function.name == "iceberg") ||
|
||||
(function.name == "gcs"))
|
||||
{
|
||||
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...)
|
||||
findS3FunctionSecretArguments(/* is_cluster_function= */ false);
|
||||
|
@ -5,9 +5,11 @@
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Common/BitHelpers.h>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Common/getNumberOfPhysicalCPUCores.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Core/Field.h>
|
||||
#include <Core/ServerSettings.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
@ -29,7 +31,6 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_ALLOCATE_MEMORY;
|
||||
extern const int FORMAT_VERSION_TOO_OLD;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int INCORRECT_DATA;
|
||||
@ -131,8 +132,7 @@ void USearchIndexWithSerialization::deserialize(ReadBuffer & istr)
|
||||
/// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release()));
|
||||
|
||||
if (!try_reserve(limits()))
|
||||
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for usearch index");
|
||||
try_reserve(limits());
|
||||
}
|
||||
|
||||
USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const
|
||||
@ -270,20 +270,49 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length");
|
||||
|
||||
/// Reserving space is mandatory
|
||||
if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)))
|
||||
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index");
|
||||
size_t max_thread_pool_size = Context::getGlobalContextInstance()->getServerSettings().max_build_vector_similarity_index_thread_pool_size;
|
||||
if (max_thread_pool_size == 0)
|
||||
max_thread_pool_size = getNumberOfPhysicalCPUCores();
|
||||
unum::usearch::index_limits_t limits(roundUpToPowerOfTwoOrZero(index->size() + rows), max_thread_pool_size);
|
||||
index->reserve(limits);
|
||||
|
||||
for (size_t row = 0; row < rows; ++row)
|
||||
/// Vector index creation is slooooow. Add the new rows in parallel. The threadpool is global to avoid oversubscription when multiple
|
||||
/// indexes are build simultaneously (e.g. multiple merges run at the same time).
|
||||
auto & thread_pool = Context::getGlobalContextInstance()->getBuildVectorSimilarityIndexThreadPool();
|
||||
|
||||
auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group)
|
||||
{
|
||||
if (auto result = index->add(static_cast<USearchIndex::vector_key_t>(index->size()), &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
|
||||
SCOPE_EXIT_SAFE(
|
||||
if (thread_group)
|
||||
CurrentThread::detachFromGroupIfNotDetached();
|
||||
);
|
||||
|
||||
if (thread_group)
|
||||
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||
|
||||
/// add is thread-safe
|
||||
if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
|
||||
{
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release()));
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::USearchAddCount);
|
||||
ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members);
|
||||
ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances);
|
||||
}
|
||||
};
|
||||
|
||||
size_t index_size = index->size();
|
||||
|
||||
for (size_t row = 0; row < rows; ++row)
|
||||
{
|
||||
auto key = static_cast<USearchIndex::vector_key_t>(index_size + row);
|
||||
auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); };
|
||||
thread_pool.scheduleOrThrowOnError(task);
|
||||
}
|
||||
|
||||
thread_pool.wait();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/MergeTreePartsMover.h>
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
#include <Common/FailPoint.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
#include <set>
|
||||
@ -15,6 +16,11 @@ namespace ErrorCodes
|
||||
extern const int DIRECTORY_ALREADY_EXISTS;
|
||||
}
|
||||
|
||||
namespace FailPoints
|
||||
{
|
||||
extern const char stop_moving_part_before_swap_with_active[];
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
@ -226,6 +232,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
|
||||
cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name);
|
||||
|
||||
MutableDataPartStoragePtr cloned_part_storage;
|
||||
bool preserve_blobs = false;
|
||||
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
|
||||
{
|
||||
/// Try zero-copy replication and fallback to default copy if it's not possible
|
||||
@ -253,6 +260,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
|
||||
if (zero_copy_part)
|
||||
{
|
||||
/// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder
|
||||
preserve_blobs = true;
|
||||
zero_copy_part->is_temp = false; /// Do not remove it in dtor
|
||||
cloned_part_storage = zero_copy_part->getDataPartStoragePtr();
|
||||
}
|
||||
@ -272,7 +280,17 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
|
||||
cloned_part.part = std::move(builder).withPartFormatFromDisk().build();
|
||||
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath());
|
||||
|
||||
cloned_part.part->is_temp = data->allowRemoveStaleMovingParts();
|
||||
cloned_part.part->is_temp = false;
|
||||
if (data->allowRemoveStaleMovingParts())
|
||||
{
|
||||
cloned_part.part->is_temp = true;
|
||||
/// Setting it in case connection to zookeeper is lost while moving
|
||||
/// Otherwise part might be stuck in the moving directory due to the KEEPER_EXCEPTION in part's destructor
|
||||
if (preserve_blobs)
|
||||
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::PRESERVE_BLOBS;
|
||||
else
|
||||
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
|
||||
}
|
||||
cloned_part.part->loadColumnsChecksumsIndexes(true, true);
|
||||
cloned_part.part->loadVersionMetadata();
|
||||
cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime();
|
||||
@ -282,6 +300,8 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
|
||||
|
||||
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
|
||||
{
|
||||
/// Used to get some stuck parts in the moving directory by stopping moves while pause is active
|
||||
FailPointInjection::pauseFailPoint(FailPoints::stop_moving_part_before_swap_with_active);
|
||||
if (moves_blocker.isCancelled())
|
||||
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
|
||||
#include <Storages/TableZnodeInfo.h>
|
||||
#include <Core/Types.h>
|
||||
|
||||
namespace DB
|
||||
@ -16,9 +17,7 @@ struct ReplicatedTableStatus
|
||||
|
||||
ReplicatedMergeTreeQueue::Status queue;
|
||||
UInt32 parts_to_check;
|
||||
String zookeeper_name;
|
||||
String zookeeper_path;
|
||||
String replica_name;
|
||||
TableZnodeInfo zookeeper_info;
|
||||
String replica_path;
|
||||
Int32 columns_version;
|
||||
UInt64 log_max_index;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageMergeTree.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/TableZnodeInfo.h>
|
||||
|
||||
#include <Core/ServerSettings.h>
|
||||
#include <Core/Settings.h>
|
||||
@ -88,32 +89,23 @@ See details in documentation: https://clickhouse.com/docs/en/engines/table-engin
|
||||
If you use the Replicated version of engines, see https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication/.
|
||||
)";
|
||||
|
||||
static ColumnsDescription getColumnsDescriptionFromZookeeper(const String & raw_zookeeper_path, ContextMutablePtr context)
|
||||
static ColumnsDescription getColumnsDescriptionFromZookeeper(const TableZnodeInfo & zookeeper_info, ContextMutablePtr context)
|
||||
{
|
||||
String zookeeper_name = zkutil::extractZooKeeperName(raw_zookeeper_path);
|
||||
String zookeeper_path = zkutil::extractZooKeeperPath(raw_zookeeper_path, true);
|
||||
|
||||
if (!context->hasZooKeeper() && !context->hasAuxiliaryZooKeeper(zookeeper_name))
|
||||
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure without zookeeper, you must specify the structure manually"};
|
||||
|
||||
zkutil::ZooKeeperPtr zookeeper;
|
||||
try
|
||||
{
|
||||
if (zookeeper_name == StorageReplicatedMergeTree::getDefaultZooKeeperName())
|
||||
zookeeper = context->getZooKeeper();
|
||||
else
|
||||
zookeeper = context->getAuxiliaryZooKeeper(zookeeper_name);
|
||||
zookeeper = context->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure from zookeeper, because cannot get zookeeper: {}. You must specify structure manually", getCurrentExceptionMessage(false)};
|
||||
}
|
||||
|
||||
if (!zookeeper->exists(zookeeper_path + "/replicas"))
|
||||
if (!zookeeper->exists(zookeeper_info.path + "/replicas"))
|
||||
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure, because there no other replicas in zookeeper. You must specify the structure manually"};
|
||||
|
||||
Coordination::Stat columns_stat;
|
||||
return ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_path) / "columns", &columns_stat));
|
||||
return ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_info.path) / "columns", &columns_stat));
|
||||
}
|
||||
|
||||
/// Returns whether a new syntax is used to define a table engine, i.e. MergeTree() PRIMARY KEY ... PARTITION BY ... SETTINGS ...
|
||||
@ -184,23 +176,16 @@ static std::string_view getNamePart(const String & engine_name)
|
||||
/// Extracts zookeeper path and replica name from the table engine's arguments.
|
||||
/// The function can modify those arguments (that's why they're passed separately in `engine_args`) and also determines RenamingRestrictions.
|
||||
/// The function assumes the table engine is Replicated.
|
||||
static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
static TableZnodeInfo extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
const ASTCreateQuery & query,
|
||||
const StorageID & table_id,
|
||||
const String & engine_name,
|
||||
ASTs & engine_args,
|
||||
LoadingStrictnessLevel mode,
|
||||
const ContextPtr & local_context,
|
||||
String & zookeeper_path,
|
||||
String & replica_name,
|
||||
RenamingRestrictions & renaming_restrictions)
|
||||
const ContextPtr & local_context)
|
||||
{
|
||||
chassert(isReplicated(engine_name));
|
||||
|
||||
zookeeper_path = "";
|
||||
replica_name = "";
|
||||
renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
|
||||
|
||||
bool is_extended_storage_def = isExtendedStorageDef(query);
|
||||
|
||||
if (is_extended_storage_def)
|
||||
@ -210,62 +195,12 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
evaluateEngineArgs(engine_args, local_context);
|
||||
}
|
||||
|
||||
bool is_on_cluster = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
|
||||
bool is_replicated_database = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
|
||||
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
|
||||
|
||||
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
|
||||
/// and if UUID was explicitly passed in CREATE TABLE (like for ATTACH)
|
||||
bool allow_uuid_macro = is_on_cluster || is_replicated_database || query.attach || query.has_uuid;
|
||||
|
||||
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name)
|
||||
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name, String zookeeper_path, String replica_name) -> TableZnodeInfo
|
||||
{
|
||||
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
Macros::MacroExpansionInfo info;
|
||||
/// NOTE: it's not recursive
|
||||
info.expand_special_macros_only = true;
|
||||
info.table_id = table_id;
|
||||
/// Avoid unfolding {uuid} macro on this step.
|
||||
/// We did unfold it in previous versions to make moving table from Atomic to Ordinary database work correctly,
|
||||
/// but now it's not allowed (and it was the only reason to unfold {uuid} macro).
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
zookeeper_path = local_context->getMacros()->expand(zookeeper_path, info);
|
||||
|
||||
info.level = 0;
|
||||
replica_name = local_context->getMacros()->expand(replica_name, info);
|
||||
}
|
||||
|
||||
ast_zk_path->value = zookeeper_path;
|
||||
ast_replica_name->value = replica_name;
|
||||
|
||||
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
|
||||
/// to make possible copying metadata files between replicas.
|
||||
Macros::MacroExpansionInfo info;
|
||||
info.table_id = table_id;
|
||||
if (is_replicated_database)
|
||||
{
|
||||
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
|
||||
info.shard = getReplicatedDatabaseShardName(database);
|
||||
info.replica = getReplicatedDatabaseReplicaName(database);
|
||||
}
|
||||
if (!allow_uuid_macro)
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
zookeeper_path = local_context->getMacros()->expand(zookeeper_path, info);
|
||||
|
||||
info.level = 0;
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
replica_name = local_context->getMacros()->expand(replica_name, info);
|
||||
|
||||
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
|
||||
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
|
||||
/// or if one of these macros is recursively expanded from some other macro.
|
||||
/// Also do not allow to move table from Atomic to Ordinary database if there's {uuid} macro
|
||||
if (info.expanded_database || info.expanded_table)
|
||||
renaming_restrictions = RenamingRestrictions::DO_NOT_ALLOW;
|
||||
else if (info.expanded_uuid)
|
||||
renaming_restrictions = RenamingRestrictions::ALLOW_PRESERVING_UUID;
|
||||
TableZnodeInfo res = TableZnodeInfo::resolve(zookeeper_path, replica_name, table_id, query, mode, local_context);
|
||||
ast_zk_path->value = res.full_path_for_metadata;
|
||||
ast_replica_name->value = res.replica_name_for_metadata;
|
||||
return res;
|
||||
};
|
||||
|
||||
size_t arg_num = 0;
|
||||
@ -277,6 +212,9 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
|
||||
if (has_valid_arguments)
|
||||
{
|
||||
bool is_replicated_database = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
|
||||
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
|
||||
|
||||
if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 0)
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
@ -293,27 +231,22 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
|
||||
/// Get path and name from engine arguments
|
||||
auto * ast_zk_path = engine_args[arg_num]->as<ASTLiteral>();
|
||||
if (ast_zk_path && ast_zk_path->value.getType() == Field::Types::String)
|
||||
zookeeper_path = ast_zk_path->value.safeGet<String>();
|
||||
else
|
||||
if (!ast_zk_path || ast_zk_path->value.getType() != Field::Types::String)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path in ZooKeeper must be a string literal{}", verbose_help_message);
|
||||
|
||||
auto * ast_replica_name = engine_args[arg_num + 1]->as<ASTLiteral>();
|
||||
if (ast_replica_name && ast_replica_name->value.getType() == Field::Types::String)
|
||||
replica_name = ast_replica_name->value.safeGet<String>();
|
||||
else
|
||||
if (!ast_replica_name || ast_replica_name->value.getType() != Field::Types::String)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must be a string literal{}", verbose_help_message);
|
||||
|
||||
|
||||
if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 2)
|
||||
{
|
||||
LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "Replacing user-provided ZooKeeper path and replica name ({}, {}) "
|
||||
"with default arguments", zookeeper_path, replica_name);
|
||||
engine_args[arg_num]->as<ASTLiteral>()->value = zookeeper_path = server_settings.default_replica_path;
|
||||
engine_args[arg_num + 1]->as<ASTLiteral>()->value = replica_name = server_settings.default_replica_name;
|
||||
"with default arguments", ast_zk_path->value.safeGet<String>(), ast_replica_name->value.safeGet<String>());
|
||||
ast_zk_path->value = server_settings.default_replica_path;
|
||||
ast_replica_name->value = server_settings.default_replica_name;
|
||||
}
|
||||
|
||||
expand_macro(ast_zk_path, ast_replica_name);
|
||||
return expand_macro(ast_zk_path, ast_replica_name, ast_zk_path->value.safeGet<String>(), ast_replica_name->value.safeGet<String>());
|
||||
}
|
||||
else if (is_extended_storage_def
|
||||
&& (arg_cnt == 0
|
||||
@ -322,24 +255,24 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
{
|
||||
/// Try use default values if arguments are not specified.
|
||||
/// Note: {uuid} macro works for ON CLUSTER queries when database engine is Atomic.
|
||||
zookeeper_path = server_settings.default_replica_path;
|
||||
/// TODO maybe use hostname if {replica} is not defined?
|
||||
replica_name = server_settings.default_replica_name;
|
||||
|
||||
/// Modify query, so default values will be written to metadata
|
||||
assert(arg_num == 0);
|
||||
ASTs old_args;
|
||||
std::swap(engine_args, old_args);
|
||||
auto path_arg = std::make_shared<ASTLiteral>(zookeeper_path);
|
||||
auto name_arg = std::make_shared<ASTLiteral>(replica_name);
|
||||
auto path_arg = std::make_shared<ASTLiteral>("");
|
||||
auto name_arg = std::make_shared<ASTLiteral>("");
|
||||
auto * ast_zk_path = path_arg.get();
|
||||
auto * ast_replica_name = name_arg.get();
|
||||
|
||||
expand_macro(ast_zk_path, ast_replica_name);
|
||||
auto res = expand_macro(ast_zk_path, ast_replica_name, server_settings.default_replica_path, server_settings.default_replica_name);
|
||||
|
||||
engine_args.emplace_back(std::move(path_arg));
|
||||
engine_args.emplace_back(std::move(name_arg));
|
||||
std::move(std::begin(old_args), std::end(old_args), std::back_inserter(engine_args));
|
||||
|
||||
return res;
|
||||
}
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected two string literal arguments: zookeeper_path and replica_name");
|
||||
@ -363,15 +296,11 @@ std::optional<String> extractZooKeeperPathFromReplicatedTableDef(const ASTCreate
|
||||
for (auto & engine_arg : engine_args)
|
||||
engine_arg = engine_arg->clone();
|
||||
|
||||
LoadingStrictnessLevel mode = LoadingStrictnessLevel::CREATE;
|
||||
String zookeeper_path;
|
||||
String replica_name;
|
||||
RenamingRestrictions renaming_restrictions;
|
||||
|
||||
try
|
||||
{
|
||||
extractZooKeeperPathAndReplicaNameFromEngineArgs(query, table_id, engine_name, engine_args, mode, local_context,
|
||||
zookeeper_path, replica_name, renaming_restrictions);
|
||||
auto res = extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
query, table_id, engine_name, engine_args, LoadingStrictnessLevel::CREATE, local_context);
|
||||
return res.full_path;
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
@ -382,8 +311,6 @@ std::optional<String> extractZooKeeperPathFromReplicatedTableDef(const ASTCreate
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
||||
return zookeeper_path;
|
||||
}
|
||||
|
||||
static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
@ -551,19 +478,17 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
}
|
||||
|
||||
/// Extract zookeeper path and replica name from engine arguments.
|
||||
String zookeeper_path;
|
||||
String replica_name;
|
||||
RenamingRestrictions renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
|
||||
TableZnodeInfo zookeeper_info;
|
||||
|
||||
if (replicated)
|
||||
{
|
||||
extractZooKeeperPathAndReplicaNameFromEngineArgs(args.query, args.table_id, args.engine_name, args.engine_args, args.mode,
|
||||
args.getLocalContext(), zookeeper_path, replica_name, renaming_restrictions);
|
||||
zookeeper_info = extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
args.query, args.table_id, args.engine_name, args.engine_args, args.mode, args.getLocalContext());
|
||||
|
||||
if (replica_name.empty())
|
||||
if (zookeeper_info.replica_name.empty())
|
||||
throw Exception(ErrorCodes::NO_REPLICA_NAME_GIVEN, "No replica name in config{}", verbose_help_message);
|
||||
// '\t' and '\n' will interrupt parsing 'source replica' in ReplicatedMergeTreeLogEntryData::readText
|
||||
if (replica_name.find('\t') != String::npos || replica_name.find('\n') != String::npos)
|
||||
if (zookeeper_info.replica_name.find('\t') != String::npos || zookeeper_info.replica_name.find('\n') != String::npos)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must not contain '\\t' or '\\n'");
|
||||
|
||||
arg_cnt = engine_args.size(); /// Update `arg_cnt` here because extractZooKeeperPathAndReplicaNameFromEngineArgs() could add arguments.
|
||||
@ -649,7 +574,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
|
||||
ColumnsDescription columns;
|
||||
if (args.columns.empty() && replicated)
|
||||
columns = getColumnsDescriptionFromZookeeper(zookeeper_path, context);
|
||||
columns = getColumnsDescriptionFromZookeeper(zookeeper_info, context);
|
||||
else
|
||||
columns = args.columns;
|
||||
|
||||
@ -879,8 +804,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
need_check_table_structure = txn->isInitialQuery();
|
||||
|
||||
return std::make_shared<StorageReplicatedMergeTree>(
|
||||
zookeeper_path,
|
||||
replica_name,
|
||||
zookeeper_info,
|
||||
args.mode,
|
||||
args.table_id,
|
||||
args.relative_data_path,
|
||||
@ -889,7 +813,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
date_column_name,
|
||||
merging_params,
|
||||
std::move(storage_settings),
|
||||
renaming_restrictions,
|
||||
need_check_table_structure);
|
||||
}
|
||||
else
|
||||
|
@ -211,7 +211,6 @@ namespace ActionLocks
|
||||
static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000;
|
||||
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
|
||||
static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
|
||||
const String StorageReplicatedMergeTree::default_zookeeper_name = "default";
|
||||
|
||||
void StorageReplicatedMergeTree::setZooKeeper()
|
||||
{
|
||||
@ -221,18 +220,9 @@ void StorageReplicatedMergeTree::setZooKeeper()
|
||||
/// strange effects. So we always use only one session for all tables.
|
||||
/// (excluding auxiliary zookeepers)
|
||||
|
||||
if (zookeeper_name == default_zookeeper_name)
|
||||
{
|
||||
auto new_keeper = getContext()->getZooKeeper();
|
||||
std::lock_guard lock(current_zookeeper_mutex);
|
||||
current_zookeeper = new_keeper;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto new_keeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
|
||||
std::lock_guard lock(current_zookeeper_mutex);
|
||||
current_zookeeper = new_keeper;
|
||||
}
|
||||
auto new_keeper = getContext()->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
|
||||
std::lock_guard lock(current_zookeeper_mutex);
|
||||
current_zookeeper = new_keeper;
|
||||
}
|
||||
|
||||
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const
|
||||
@ -263,7 +253,7 @@ String StorageReplicatedMergeTree::getEndpointName() const
|
||||
{
|
||||
const MergeTreeSettings & settings = getContext()->getReplicatedMergeTreeSettings();
|
||||
if (settings.enable_the_endpoint_id_with_zookeeper_name_prefix)
|
||||
return zookeeper_name + ":" + replica_path;
|
||||
return zookeeper_info.zookeeper_name + ":" + replica_path;
|
||||
|
||||
return replica_path;
|
||||
}
|
||||
@ -294,8 +284,7 @@ static MergeTreePartInfo makeDummyDropRangeForMovePartitionOrAttachPartitionFrom
|
||||
}
|
||||
|
||||
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
const String & zookeeper_path_,
|
||||
const String & replica_name_,
|
||||
const TableZnodeInfo & zookeeper_info_,
|
||||
LoadingStrictnessLevel mode,
|
||||
const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
@ -304,7 +293,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
const String & date_column_name,
|
||||
const MergingParams & merging_params_,
|
||||
std::unique_ptr<MergeTreeSettings> settings_,
|
||||
RenamingRestrictions renaming_restrictions_,
|
||||
bool need_check_structure)
|
||||
: MergeTreeData(table_id_,
|
||||
metadata_,
|
||||
@ -315,11 +303,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
true, /// require_part_metadata
|
||||
mode,
|
||||
[this] (const std::string & name) { enqueuePartForCheck(name); })
|
||||
, full_zookeeper_path(zookeeper_path_)
|
||||
, zookeeper_name(zkutil::extractZooKeeperName(full_zookeeper_path))
|
||||
, zookeeper_path(zkutil::extractZooKeeperPath(full_zookeeper_path, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, log.load()))
|
||||
, replica_name(replica_name_)
|
||||
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name_)
|
||||
, zookeeper_info(zookeeper_info_)
|
||||
, zookeeper_path(zookeeper_info.path)
|
||||
, replica_name(zookeeper_info.replica_name)
|
||||
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name)
|
||||
, reader(*this)
|
||||
, writer(*this)
|
||||
, merger_mutator(*this)
|
||||
@ -331,7 +318,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
, part_check_thread(*this)
|
||||
, restarting_thread(*this)
|
||||
, part_moves_between_shards_orchestrator(*this)
|
||||
, renaming_restrictions(renaming_restrictions_)
|
||||
, replicated_fetches_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_fetches_network_bandwidth, getContext()->getReplicatedFetchesThrottler()))
|
||||
, replicated_sends_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_sends_network_bandwidth, getContext()->getReplicatedSendsThrottler()))
|
||||
{
|
||||
@ -365,7 +351,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
/// Will be activated by restarting thread.
|
||||
mutations_finalizing_task->deactivate();
|
||||
|
||||
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_name);
|
||||
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
|
||||
if (has_zookeeper)
|
||||
{
|
||||
/// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't
|
||||
@ -845,7 +831,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
|
||||
else
|
||||
{
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zookeeper);
|
||||
if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, log.load()))
|
||||
if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_info, metadata_drop_lock, log.load()))
|
||||
{
|
||||
/// Someone is recursively removing table right now, we cannot create new table until old one is removed
|
||||
continue;
|
||||
@ -1107,11 +1093,7 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
|
||||
|
||||
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperIfTableShutDown() const
|
||||
{
|
||||
zkutil::ZooKeeperPtr maybe_new_zookeeper;
|
||||
if (zookeeper_name == default_zookeeper_name)
|
||||
maybe_new_zookeeper = getContext()->getZooKeeper();
|
||||
else
|
||||
maybe_new_zookeeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
|
||||
zkutil::ZooKeeperPtr maybe_new_zookeeper = getContext()->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
|
||||
maybe_new_zookeeper->sync(zookeeper_path);
|
||||
return maybe_new_zookeeper;
|
||||
}
|
||||
@ -1226,7 +1208,7 @@ void StorageReplicatedMergeTree::drop()
|
||||
LOG_INFO(log, "Dropping table with non-zero lost_part_count equal to {}", lost_part_count);
|
||||
}
|
||||
|
||||
bool last_replica_dropped = dropReplica(zookeeper, zookeeper_path, replica_name, log.load(), getSettings(), &has_metadata_in_zookeeper);
|
||||
bool last_replica_dropped = dropReplica(zookeeper, zookeeper_info, log.load(), getSettings(), &has_metadata_in_zookeeper);
|
||||
if (last_replica_dropped)
|
||||
{
|
||||
dropZookeeperZeroCopyLockPaths(zookeeper, zero_copy_locks_paths, log.load());
|
||||
@ -1235,13 +1217,15 @@ void StorageReplicatedMergeTree::drop()
|
||||
}
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
|
||||
LoggerPtr logger, MergeTreeSettingsPtr table_settings, std::optional<bool> * has_metadata_out)
|
||||
bool StorageReplicatedMergeTree::dropReplica(
|
||||
zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info, LoggerPtr logger,
|
||||
MergeTreeSettingsPtr table_settings, std::optional<bool> * has_metadata_out)
|
||||
{
|
||||
if (zookeeper->expired())
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Table was not dropped because ZooKeeper session has expired.");
|
||||
|
||||
auto remote_replica_path = zookeeper_path + "/replicas/" + replica;
|
||||
const String & zookeeper_path = zookeeper_info.path;
|
||||
auto remote_replica_path = zookeeper_path + "/replicas/" + zookeeper_info.replica_name;
|
||||
|
||||
LOG_INFO(logger, "Removing replica {}, marking it as lost", remote_replica_path);
|
||||
/// Mark itself lost before removing, because the following recursive removal may fail
|
||||
@ -1352,30 +1336,33 @@ bool StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
|
||||
{
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zookeeper);
|
||||
LOG_INFO(logger, "Removing table {} (this might take several minutes)", zookeeper_path);
|
||||
removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, logger);
|
||||
removeTableNodesFromZooKeeper(zookeeper, zookeeper_info, metadata_drop_lock, logger);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::dropReplica(const String & drop_zookeeper_path, const String & drop_replica, LoggerPtr logger)
|
||||
bool StorageReplicatedMergeTree::dropReplica(const String & drop_replica, LoggerPtr logger)
|
||||
{
|
||||
zkutil::ZooKeeperPtr zookeeper = getZooKeeperIfTableShutDown();
|
||||
|
||||
/// NOTE it's not atomic: replica may become active after this check, but before dropReplica(...)
|
||||
/// However, the main use case is to drop dead replica, which cannot become active.
|
||||
/// This check prevents only from accidental drop of some other replica.
|
||||
if (zookeeper->exists(drop_zookeeper_path + "/replicas/" + drop_replica + "/is_active"))
|
||||
if (zookeeper->exists(zookeeper_info.path + "/replicas/" + drop_replica + "/is_active"))
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't drop replica: {}, because it's active", drop_replica);
|
||||
|
||||
return dropReplica(zookeeper, drop_zookeeper_path, drop_replica, logger);
|
||||
TableZnodeInfo info = zookeeper_info;
|
||||
info.replica_name = drop_replica;
|
||||
return dropReplica(zookeeper, info, logger);
|
||||
}
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper,
|
||||
const String & zookeeper_path, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger)
|
||||
const TableZnodeInfo & zookeeper_info2, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger)
|
||||
{
|
||||
const String & zookeeper_path = zookeeper_info2.path;
|
||||
bool completely_removed = false;
|
||||
|
||||
/// NOTE /block_numbers/ actually is not flat, because /block_numbers/<partition_id>/ may have ephemeral children,
|
||||
@ -1443,6 +1430,15 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
|
||||
metadata_drop_lock->setAlreadyRemoved();
|
||||
completely_removed = true;
|
||||
LOG_INFO(logger, "Table {} was successfully removed from ZooKeeper", zookeeper_path);
|
||||
|
||||
try
|
||||
{
|
||||
zookeeper_info2.dropAncestorZnodesIfNeeded(zookeeper);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_WARNING(logger, "Failed to drop ancestor znodes {} - {} after dropping table: {}", zookeeper_info2.path_prefix_for_drop, zookeeper_info2.path, getCurrentExceptionMessage(false));
|
||||
}
|
||||
}
|
||||
|
||||
return completely_removed;
|
||||
@ -2295,7 +2291,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
|
||||
String source_replica_path = fs::path(zookeeper_path) / "replicas" / replica;
|
||||
if (!fetchPart(part_name,
|
||||
metadata_snapshot,
|
||||
zookeeper_name,
|
||||
zookeeper_info.zookeeper_name,
|
||||
source_replica_path,
|
||||
/* to_detached= */ false,
|
||||
entry.quorum,
|
||||
@ -2858,7 +2854,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
|
||||
interserver_scheme, address.scheme, address.host);
|
||||
|
||||
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
|
||||
metadata_snapshot, getContext(), part_desc->found_new_part_name, zookeeper_name, source_replica_path,
|
||||
metadata_snapshot, getContext(), part_desc->found_new_part_name, zookeeper_info.zookeeper_name, source_replica_path,
|
||||
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(),
|
||||
interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_");
|
||||
part_desc->res_part = fetched_part;
|
||||
@ -2980,7 +2976,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
|
||||
interserver_scheme, address.scheme, address.host);
|
||||
|
||||
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
|
||||
metadata_snapshot, getContext(), entry.new_part_name, zookeeper_name, source_replica_path,
|
||||
metadata_snapshot, getContext(), entry.new_part_name, zookeeper_info.zookeeper_name, source_replica_path,
|
||||
address.host, address.replication_port,
|
||||
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme,
|
||||
replicated_fetches_throttler, true);
|
||||
@ -5076,7 +5072,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
|
||||
currently_fetching_parts.erase(part_name);
|
||||
});
|
||||
|
||||
LOG_DEBUG(log, "Fetching already known part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
|
||||
LOG_DEBUG(log, "Fetching already known part {} from {}:{}", part_name, zookeeper_info.zookeeper_name, source_replica_path);
|
||||
|
||||
TableLockHolder table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
|
||||
|
||||
@ -5109,7 +5105,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
|
||||
"'{}' != '{}', can't fetch part from {}", interserver_scheme, address.scheme, address.host);
|
||||
|
||||
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
|
||||
metadata_snapshot, getContext(), part_name, zookeeper_name, source_replica_path,
|
||||
metadata_snapshot, getContext(), part_name, zookeeper_info.zookeeper_name, source_replica_path,
|
||||
address.host, address.replication_port,
|
||||
timeouts, credentials->getUser(), credentials->getPassword(),
|
||||
interserver_scheme, replicated_fetches_throttler, false, "", nullptr, true,
|
||||
@ -5148,7 +5144,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
|
||||
|
||||
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
|
||||
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_info.zookeeper_name, source_replica_path);
|
||||
return part;
|
||||
}
|
||||
|
||||
@ -6653,10 +6649,10 @@ void StorageReplicatedMergeTree::checkTableCanBeDropped(ContextPtr query_context
|
||||
|
||||
void StorageReplicatedMergeTree::checkTableCanBeRenamed(const StorageID & new_name) const
|
||||
{
|
||||
if (renaming_restrictions == RenamingRestrictions::ALLOW_ANY)
|
||||
if (zookeeper_info.renaming_restrictions == RenamingRestrictions::ALLOW_ANY)
|
||||
return;
|
||||
|
||||
if (renaming_restrictions == RenamingRestrictions::DO_NOT_ALLOW)
|
||||
if (zookeeper_info.renaming_restrictions == RenamingRestrictions::DO_NOT_ALLOW)
|
||||
{
|
||||
auto old_name = getStorageID();
|
||||
bool is_server_startup = Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER
|
||||
@ -6680,7 +6676,7 @@ void StorageReplicatedMergeTree::checkTableCanBeRenamed(const StorageID & new_na
|
||||
"If you really want to rename table, you should edit metadata file first and restart server or reattach the table.");
|
||||
}
|
||||
|
||||
assert(renaming_restrictions == RenamingRestrictions::ALLOW_PRESERVING_UUID);
|
||||
assert(zookeeper_info.renaming_restrictions == RenamingRestrictions::ALLOW_PRESERVING_UUID);
|
||||
if (!new_name.hasUUID() && getStorageID().hasUUID())
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
|
||||
"Cannot move Replicated table to Ordinary database, because zookeeper_path contains implicit "
|
||||
@ -7039,9 +7035,7 @@ void StorageReplicatedMergeTree::getStatus(ReplicatedTableStatus & res, bool wit
|
||||
/// NOTE: consider convert to UInt64
|
||||
res.parts_to_check = static_cast<UInt32>(part_check_thread.size());
|
||||
|
||||
res.zookeeper_name = zookeeper_name;
|
||||
res.zookeeper_path = zookeeper_path;
|
||||
res.replica_name = replica_name;
|
||||
res.zookeeper_info = zookeeper_info;
|
||||
res.replica_path = replica_path;
|
||||
res.columns_version = -1;
|
||||
|
||||
@ -7250,11 +7244,7 @@ void StorageReplicatedMergeTree::fetchPartition(
|
||||
}
|
||||
}
|
||||
|
||||
zkutil::ZooKeeperPtr zookeeper;
|
||||
if (from_zookeeper_name != default_zookeeper_name)
|
||||
zookeeper = getContext()->getAuxiliaryZooKeeper(from_zookeeper_name);
|
||||
else
|
||||
zookeeper = getZooKeeper();
|
||||
zkutil::ZooKeeperPtr zookeeper = getContext()->getDefaultOrAuxiliaryZooKeeper(from_zookeeper_name);
|
||||
|
||||
if (from.back() == '/')
|
||||
from.resize(from.size() - 1);
|
||||
@ -10540,7 +10530,7 @@ void StorageReplicatedMergeTree::backupData(
|
||||
|
||||
auto coordination = backup_entries_collector.getBackupCoordination();
|
||||
|
||||
coordination->addReplicatedDataPath(full_zookeeper_path, data_path_in_backup);
|
||||
coordination->addReplicatedDataPath(zookeeper_info.full_path, data_path_in_backup);
|
||||
|
||||
using PartNameAndChecksum = IBackupCoordination::PartNameAndChecksum;
|
||||
std::vector<PartNameAndChecksum> part_names_with_hashes;
|
||||
@ -10549,7 +10539,7 @@ void StorageReplicatedMergeTree::backupData(
|
||||
part_names_with_hashes.emplace_back(PartNameAndChecksum{part_backup_entries.part_name, part_backup_entries.part_checksum});
|
||||
|
||||
/// Send our list of part names to the coordination (to compare with other replicas).
|
||||
coordination->addReplicatedPartNames(full_zookeeper_path, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
|
||||
coordination->addReplicatedPartNames(zookeeper_info.full_path, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
|
||||
|
||||
/// Send a list of mutations to the coordination too (we need to find the mutations which are not finished for added part names).
|
||||
{
|
||||
@ -10591,13 +10581,13 @@ void StorageReplicatedMergeTree::backupData(
|
||||
}
|
||||
|
||||
if (!mutation_infos.empty())
|
||||
coordination->addReplicatedMutations(full_zookeeper_path, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
|
||||
coordination->addReplicatedMutations(zookeeper_info.full_path, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
|
||||
}
|
||||
}
|
||||
|
||||
/// This task will be executed after all replicas have collected their parts and the coordination is ready to
|
||||
/// give us the final list of parts to add to the BackupEntriesCollector.
|
||||
auto post_collecting_task = [my_full_zookeeper_path = full_zookeeper_path,
|
||||
auto post_collecting_task = [my_full_zookeeper_path = zookeeper_info.full_path,
|
||||
my_replica_name = getReplicaName(),
|
||||
coordination,
|
||||
my_parts_backup_entries = std::move(parts_backup_entries),
|
||||
@ -10636,7 +10626,7 @@ void StorageReplicatedMergeTree::backupData(
|
||||
|
||||
void StorageReplicatedMergeTree::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions)
|
||||
{
|
||||
if (!restorer.getRestoreCoordination()->acquireInsertingDataIntoReplicatedTable(full_zookeeper_path))
|
||||
if (!restorer.getRestoreCoordination()->acquireInsertingDataIntoReplicatedTable(zookeeper_info.full_path))
|
||||
{
|
||||
/// Other replica is already restoring the data of this table.
|
||||
/// We'll get them later due to replication, it's not necessary to read it from the backup.
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
|
||||
#include <Storages/MergeTree/ReplicatedTableStatus.h>
|
||||
#include <Storages/RenamingRestrictions.h>
|
||||
#include <Storages/TableZnodeInfo.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
#include <Interpreters/PartLog.h>
|
||||
@ -98,8 +99,7 @@ public:
|
||||
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
|
||||
*/
|
||||
StorageReplicatedMergeTree(
|
||||
const String & zookeeper_path_,
|
||||
const String & replica_name_,
|
||||
const TableZnodeInfo & zookeeper_info_,
|
||||
LoadingStrictnessLevel mode,
|
||||
const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
@ -108,7 +108,6 @@ public:
|
||||
const String & date_column_name,
|
||||
const MergingParams & merging_params_,
|
||||
std::unique_ptr<MergeTreeSettings> settings_,
|
||||
RenamingRestrictions renaming_restrictions_,
|
||||
bool need_check_structure);
|
||||
|
||||
void startup() override;
|
||||
@ -244,14 +243,15 @@ public:
|
||||
/** Remove a specific replica from zookeeper.
|
||||
* returns true if there are no replicas left
|
||||
*/
|
||||
static bool dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
|
||||
static bool dropReplica(zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info,
|
||||
LoggerPtr logger, MergeTreeSettingsPtr table_settings = nullptr, std::optional<bool> * has_metadata_out = nullptr);
|
||||
|
||||
bool dropReplica(const String & drop_zookeeper_path, const String & drop_replica, LoggerPtr logger);
|
||||
bool dropReplica(const String & drop_replica, LoggerPtr logger);
|
||||
|
||||
/// Removes table from ZooKeeper after the last replica was dropped
|
||||
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
|
||||
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger);
|
||||
static bool removeTableNodesFromZooKeeper(
|
||||
zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info2,
|
||||
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger);
|
||||
|
||||
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
|
||||
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
|
||||
@ -330,17 +330,15 @@ public:
|
||||
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
|
||||
|
||||
// Return default or custom zookeeper name for table
|
||||
const String & getZooKeeperName() const { return zookeeper_name; }
|
||||
const String & getZooKeeperPath() const { return zookeeper_path; }
|
||||
const String & getFullZooKeeperPath() const { return full_zookeeper_path; }
|
||||
const String & getZooKeeperName() const { return zookeeper_info.zookeeper_name; }
|
||||
const String & getZooKeeperPath() const { return zookeeper_info.path; }
|
||||
const String & getFullZooKeeperPath() const { return zookeeper_info.full_path; }
|
||||
|
||||
// Return table id, common for different replicas
|
||||
String getTableSharedID() const override;
|
||||
|
||||
std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const override;
|
||||
|
||||
static const String & getDefaultZooKeeperName() { return default_zookeeper_name; }
|
||||
|
||||
/// Check if there are new broken disks and enqueue part recovery tasks.
|
||||
void checkBrokenDisks();
|
||||
|
||||
@ -418,12 +416,10 @@ private:
|
||||
|
||||
bool is_readonly_metric_set = false;
|
||||
|
||||
const String full_zookeeper_path;
|
||||
static const String default_zookeeper_name;
|
||||
const String zookeeper_name;
|
||||
const String zookeeper_path;
|
||||
const TableZnodeInfo zookeeper_info;
|
||||
const String zookeeper_path; // shorthand for zookeeper_info.path
|
||||
|
||||
const String replica_name;
|
||||
const String replica_name; // shorthand for zookeeper_info.replica_name
|
||||
const String replica_path;
|
||||
|
||||
/** /replicas/me/is_active.
|
||||
@ -519,9 +515,6 @@ private:
|
||||
/// True if replica was created for existing table with fixed granularity
|
||||
bool other_replicas_fixed_granularity = false;
|
||||
|
||||
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro
|
||||
const RenamingRestrictions renaming_restrictions;
|
||||
|
||||
/// Throttlers used in DataPartsExchange to lower maximum fetch/sends
|
||||
/// speed.
|
||||
ThrottlerPtr replicated_fetches_throttler;
|
||||
|
@ -530,9 +530,9 @@ Chunk SystemReplicasSource::generate()
|
||||
res_columns[col_num++]->insert(status.is_session_expired);
|
||||
res_columns[col_num++]->insert(status.queue.future_parts);
|
||||
res_columns[col_num++]->insert(status.parts_to_check);
|
||||
res_columns[col_num++]->insert(status.zookeeper_name);
|
||||
res_columns[col_num++]->insert(status.zookeeper_path);
|
||||
res_columns[col_num++]->insert(status.replica_name);
|
||||
res_columns[col_num++]->insert(status.zookeeper_info.zookeeper_name);
|
||||
res_columns[col_num++]->insert(status.zookeeper_info.path);
|
||||
res_columns[col_num++]->insert(status.zookeeper_info.replica_name);
|
||||
res_columns[col_num++]->insert(status.replica_path);
|
||||
res_columns[col_num++]->insert(status.columns_version);
|
||||
res_columns[col_num++]->insert(status.queue.queue_size);
|
||||
|
135
src/Storages/TableZnodeInfo.cpp
Normal file
135
src/Storages/TableZnodeInfo.cpp
Normal file
@ -0,0 +1,135 @@
|
||||
#include <Storages/TableZnodeInfo.h>
|
||||
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Databases/DatabaseReplicatedHelpers.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/StorageID.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
TableZnodeInfo TableZnodeInfo::resolve(const String & requested_path, const String & requested_replica_name, const StorageID & table_id, const ASTCreateQuery & query, LoadingStrictnessLevel mode, const ContextPtr & context)
|
||||
{
|
||||
bool is_on_cluster = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
|
||||
bool is_replicated_database = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
|
||||
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
|
||||
|
||||
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
|
||||
/// and if UUID was explicitly passed in CREATE TABLE (like for ATTACH)
|
||||
bool allow_uuid_macro = is_on_cluster || is_replicated_database || query.attach || query.has_uuid;
|
||||
|
||||
TableZnodeInfo res;
|
||||
res.full_path = requested_path;
|
||||
res.replica_name = requested_replica_name;
|
||||
|
||||
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
Macros::MacroExpansionInfo info;
|
||||
/// NOTE: it's not recursive
|
||||
info.expand_special_macros_only = true;
|
||||
info.table_id = table_id;
|
||||
/// Avoid unfolding {uuid} macro on this step.
|
||||
/// We did unfold it in previous versions to make moving table from Atomic to Ordinary database work correctly,
|
||||
/// but now it's not allowed (and it was the only reason to unfold {uuid} macro).
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
res.full_path = context->getMacros()->expand(res.full_path, info);
|
||||
|
||||
info.level = 0;
|
||||
res.replica_name = context->getMacros()->expand(res.replica_name, info);
|
||||
}
|
||||
|
||||
res.full_path_for_metadata = res.full_path;
|
||||
res.replica_name_for_metadata = res.replica_name;
|
||||
|
||||
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
|
||||
/// to make possible copying metadata files between replicas.
|
||||
Macros::MacroExpansionInfo info;
|
||||
info.table_id = table_id;
|
||||
if (is_replicated_database)
|
||||
{
|
||||
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
|
||||
info.shard = getReplicatedDatabaseShardName(database);
|
||||
info.replica = getReplicatedDatabaseReplicaName(database);
|
||||
}
|
||||
if (!allow_uuid_macro)
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
res.full_path = context->getMacros()->expand(res.full_path, info);
|
||||
bool expanded_uuid_in_path = info.expanded_uuid;
|
||||
|
||||
info.level = 0;
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
res.replica_name = context->getMacros()->expand(res.replica_name, info);
|
||||
|
||||
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
|
||||
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
|
||||
/// or if one of these macros is recursively expanded from some other macro.
|
||||
/// Also do not allow to move table from Atomic to Ordinary database if there's {uuid} macro
|
||||
if (info.expanded_database || info.expanded_table)
|
||||
res.renaming_restrictions = RenamingRestrictions::DO_NOT_ALLOW;
|
||||
else if (info.expanded_uuid)
|
||||
res.renaming_restrictions = RenamingRestrictions::ALLOW_PRESERVING_UUID;
|
||||
|
||||
res.zookeeper_name = zkutil::extractZooKeeperName(res.full_path);
|
||||
res.path = zkutil::extractZooKeeperPath(res.full_path, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, getLogger(table_id.getNameForLogs()));
|
||||
res.path_prefix_for_drop = res.path;
|
||||
|
||||
if (expanded_uuid_in_path)
|
||||
{
|
||||
/// When dropping table with znode path "/foo/{uuid}/bar/baz", delete not only
|
||||
/// "/foo/{uuid}/bar/baz" but also "/foo/{uuid}/bar" and "/foo/{uuid}" if they became empty.
|
||||
///
|
||||
/// (We find the uuid substring by searching instead of keeping track of it when expanding
|
||||
/// the macro. So in principle we may find a uuid substring that wasn't expanded from a
|
||||
/// macro. This should be ok because we're searching for the *last* occurrence, so we'll get
|
||||
/// a prefix at least as long as the correct one, so we won't delete znodes outside the
|
||||
/// {uuid} path component. This sounds sketchy, but propagating string indices through macro
|
||||
/// expansion passes is sketchy too (error-prone and more complex), and on balance this seems
|
||||
/// better.)
|
||||
String uuid_str = toString(table_id.uuid);
|
||||
size_t i = res.path.rfind(uuid_str);
|
||||
if (i == String::npos)
|
||||
/// Possible if the macro is in the "<auxiliary_zookeeper_name>:/" prefix, but we probably
|
||||
/// don't want to allow that.
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find uuid in zookeeper path after expanding {{uuid}} macro: {} (uuid {})", res.path, uuid_str);
|
||||
i += uuid_str.size();
|
||||
/// In case the path is "/foo/pika{uuid}chu/bar" (or "/foo/{uuid}{replica}/bar").
|
||||
while (i < res.path.size() && res.path[i] != '/')
|
||||
i += 1;
|
||||
res.path_prefix_for_drop = res.path.substr(0, i);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void TableZnodeInfo::dropAncestorZnodesIfNeeded(const zkutil::ZooKeeperPtr & zookeeper) const
|
||||
{
|
||||
chassert(path.starts_with(path_prefix_for_drop));
|
||||
if (path_prefix_for_drop.empty() || path_prefix_for_drop.size() == path.size())
|
||||
return;
|
||||
chassert(path[path_prefix_for_drop.size()] == '/');
|
||||
|
||||
String path_to_remove = path;
|
||||
while (path_to_remove.size() > path_prefix_for_drop.size())
|
||||
{
|
||||
size_t i = path_to_remove.find_last_of('/');
|
||||
chassert(i != String::npos && i >= path_prefix_for_drop.size());
|
||||
path_to_remove = path_to_remove.substr(0, i);
|
||||
|
||||
Coordination::Error rc = zookeeper->tryRemove(path_to_remove);
|
||||
if (rc != Coordination::Error::ZOK)
|
||||
/// Znode not empty or already removed by someone else.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
62
src/Storages/TableZnodeInfo.h
Normal file
62
src/Storages/TableZnodeInfo.h
Normal file
@ -0,0 +1,62 @@
|
||||
#pragma once
|
||||
|
||||
#include <base/types.h>
|
||||
#include <Storages/RenamingRestrictions.h>
|
||||
#include <Databases/LoadingStrictnessLevel.h>
|
||||
|
||||
namespace zkutil
|
||||
{
|
||||
class ZooKeeper;
|
||||
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct StorageID;
|
||||
class ASTCreateQuery;
|
||||
class Context;
|
||||
using ContextPtr = std::shared_ptr<const Context>;
|
||||
|
||||
/// Helper for replicated tables that use zookeeper for coordination among replicas.
|
||||
/// Handles things like:
|
||||
/// * Expanding macros like {table} and {uuid} in zookeeper path. Some macros are expanded+saved once
|
||||
/// on table creation (e.g. {table}, to avoid changing the path if the table is later renamed),
|
||||
/// others are expanded on each server startup and each replica (e.g. {replica} because it's
|
||||
/// different on different replicas).
|
||||
/// * When dropping table with znode path (say) "/clickhouse/tables/{uuid}/{shard}", delete not only
|
||||
/// the znode at this path but also the parent znode "/clickhouse/tables/{uuid}" if it became empty.
|
||||
/// Otherwise each created+dropped table would leave behind an empty znode.
|
||||
|
||||
struct TableZnodeInfo
|
||||
{
|
||||
String path;
|
||||
String replica_name;
|
||||
/// Which zookeeper cluster to use ("default" or one of auxiliary zookeepers listed in config).
|
||||
String zookeeper_name = "default";
|
||||
|
||||
/// Path with optional zookeeper_name prefix: "<auxiliary_zookeeper_name>:<path>".
|
||||
String full_path;
|
||||
|
||||
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro.
|
||||
RenamingRestrictions renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
|
||||
|
||||
/// Information to save in table metadata and send to replicas (if ON CLUSTER or DatabaseReplicated).
|
||||
/// Has some macros expanded (e.g. {table}), others left unexpanded (e.g. {replica}).
|
||||
String full_path_for_metadata;
|
||||
String replica_name_for_metadata;
|
||||
|
||||
/// Path to an ancestor of `path` that should be considered "owned" by this table (shared among
|
||||
/// replicas of the table). When table is dropped, this znode will be removed if it became empty.
|
||||
/// E.g. path = "/clickhouse/tables/{uuid}/{shard}", path_prefix_to_drop = "/clickhouse/tables/{uuid}".
|
||||
String path_prefix_for_drop;
|
||||
|
||||
static TableZnodeInfo resolve(
|
||||
const String & requested_path, const String & requested_replica_name,
|
||||
const StorageID & table_id, const ASTCreateQuery & query, LoadingStrictnessLevel mode,
|
||||
const ContextPtr & context);
|
||||
|
||||
void dropAncestorZnodesIfNeeded(const zkutil::ZooKeeperPtr & zookeeper) const;
|
||||
};
|
||||
|
||||
}
|
@ -42,6 +42,7 @@
|
||||
<multi_read>1</multi_read>
|
||||
<check_not_exists>1</check_not_exists>
|
||||
<create_if_not_exists>1</create_if_not_exists>
|
||||
<remove_recursive>1</remove_recursive>
|
||||
</feature_flags>
|
||||
</keeper_server>
|
||||
</clickhouse>
|
||||
|
@ -64,6 +64,7 @@ function configure()
|
||||
randomize_config_boolean_value multi_read keeper_port
|
||||
randomize_config_boolean_value check_not_exists keeper_port
|
||||
randomize_config_boolean_value create_if_not_exists keeper_port
|
||||
randomize_config_boolean_value remove_recursive keeper_port
|
||||
fi
|
||||
|
||||
sudo chown clickhouse /etc/clickhouse-server/config.d/keeper_port.xml
|
||||
|
@ -393,6 +393,7 @@ def test_table_functions():
|
||||
f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
|
||||
f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')",
|
||||
f"iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
|
||||
f"gcs('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
|
||||
]
|
||||
|
||||
def make_test_case(i):
|
||||
|
46
tests/integration/test_remove_stale_moving_parts/config.xml
Normal file
46
tests/integration/test_remove_stale_moving_parts/config.xml
Normal file
@ -0,0 +1,46 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<cluster>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>ch1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</cluster>
|
||||
</remote_servers>
|
||||
<macros>
|
||||
<shard>01</shard>
|
||||
</macros>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
</s3>
|
||||
</disks>
|
||||
<policies>
|
||||
<s3>
|
||||
<volumes>
|
||||
<default>
|
||||
<disk>default</disk>
|
||||
<perform_ttl_move_on_insert>False</perform_ttl_move_on_insert>
|
||||
</default>
|
||||
<s3>
|
||||
<disk>s3</disk>
|
||||
<perform_ttl_move_on_insert>False</perform_ttl_move_on_insert>
|
||||
</s3>
|
||||
</volumes>
|
||||
<move_factor>0.0</move_factor>
|
||||
</s3>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
|
||||
<merge_tree>
|
||||
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
|
||||
<storage_policy>s3</storage_policy>
|
||||
</merge_tree>
|
||||
<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
|
||||
</clickhouse>
|
117
tests/integration/test_remove_stale_moving_parts/test.py
Normal file
117
tests/integration/test_remove_stale_moving_parts/test.py
Normal file
@ -0,0 +1,117 @@
|
||||
from pathlib import Path
|
||||
import time
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
ch1 = cluster.add_instance(
|
||||
"ch1",
|
||||
main_configs=[
|
||||
"config.xml",
|
||||
],
|
||||
macros={"replica": "node1"},
|
||||
with_zookeeper=True,
|
||||
with_minio=True,
|
||||
)
|
||||
|
||||
DATABASE_NAME = "stale_moving_parts"
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def q(node, query):
|
||||
return node.query(database=DATABASE_NAME, sql=query)
|
||||
|
||||
|
||||
# .../disks/s3/store/
|
||||
def get_table_path(node, table):
|
||||
return (
|
||||
node.query(
|
||||
sql=f"SELECT data_paths FROM system.tables WHERE table = '{table}' and database = '{DATABASE_NAME}' LIMIT 1"
|
||||
)
|
||||
.strip('"\n[]')
|
||||
.split(",")[1]
|
||||
.strip("'")
|
||||
)
|
||||
|
||||
|
||||
def exec(node, cmd, path):
|
||||
return node.exec_in_container(
|
||||
[
|
||||
"bash",
|
||||
"-c",
|
||||
f"{cmd} {path}",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def wait_part_is_stuck(node, table_moving_path, moving_part):
|
||||
num_tries = 5
|
||||
while q(node, "SELECT part_name FROM system.moves").strip() != moving_part:
|
||||
if num_tries == 0:
|
||||
raise Exception("Part has not started to move")
|
||||
num_tries -= 1
|
||||
time.sleep(1)
|
||||
num_tries = 5
|
||||
while exec(node, "ls", table_moving_path).strip() != moving_part:
|
||||
if num_tries == 0:
|
||||
raise Exception("Part is not stuck in the moving directory")
|
||||
num_tries -= 1
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def wait_zookeeper_node_to_start(zk_nodes, timeout=60):
|
||||
start = time.time()
|
||||
while time.time() - start < timeout:
|
||||
try:
|
||||
for instance in zk_nodes:
|
||||
conn = cluster.get_kazoo_client(instance)
|
||||
conn.get_children("/")
|
||||
print("All instances of ZooKeeper started")
|
||||
return
|
||||
except Exception as ex:
|
||||
print(("Can't connect to ZooKeeper " + str(ex)))
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
def test_remove_stale_moving_parts_without_zookeeper(started_cluster):
|
||||
ch1.query(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")
|
||||
|
||||
q(
|
||||
ch1,
|
||||
"CREATE TABLE test_remove ON CLUSTER cluster ( id UInt32 ) ENGINE ReplicatedMergeTree() ORDER BY id;",
|
||||
)
|
||||
|
||||
table_moving_path = Path(get_table_path(ch1, "test_remove")) / "moving"
|
||||
|
||||
q(ch1, "SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active")
|
||||
q(ch1, "INSERT INTO test_remove SELECT number FROM numbers(100);")
|
||||
moving_part = "all_0_0_0"
|
||||
move_response = ch1.get_query_request(
|
||||
sql=f"ALTER TABLE test_remove MOVE PART '{moving_part}' TO DISK 's3'",
|
||||
database=DATABASE_NAME,
|
||||
)
|
||||
|
||||
wait_part_is_stuck(ch1, table_moving_path, moving_part)
|
||||
|
||||
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
|
||||
# Stop moves in case table is not read-only yet
|
||||
q(ch1, "SYSTEM STOP MOVES")
|
||||
q(ch1, "SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active")
|
||||
|
||||
assert "Cancelled moving parts" in move_response.get_error()
|
||||
assert exec(ch1, "ls", table_moving_path).strip() == ""
|
||||
|
||||
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
|
||||
wait_zookeeper_node_to_start(["zoo1", "zoo2", "zoo3"])
|
||||
q(ch1, "SYSTEM START MOVES")
|
||||
|
||||
q(ch1, f"DROP TABLE test_remove")
|
@ -560,7 +560,6 @@ positionCaseInsensitive
|
||||
positionCaseInsensitiveUTF8
|
||||
positionUTF8
|
||||
pow
|
||||
printf
|
||||
proportionsZTest
|
||||
protocol
|
||||
queryID
|
||||
|
@ -1,2 +1,2 @@
|
||||
default 127.0.0.1 9181 0 0 0 1 1 ['FILTERED_LIST','MULTI_READ','CHECK_NOT_EXISTS','CREATE_IF_NOT_EXISTS']
|
||||
default 127.0.0.1 9181 0 0 0 1 1 ['FILTERED_LIST','MULTI_READ','CHECK_NOT_EXISTS','CREATE_IF_NOT_EXISTS','REMOVE_RECURSIVE']
|
||||
zookeeper2 localhost 9181 0 0 0 1
|
||||
|
@ -0,0 +1,2 @@
|
||||
1
|
||||
0
|
16
tests/queries/0_stateless/03234_replicated_table_parent_znode_cleanup.sh
Executable file
16
tests/queries/0_stateless/03234_replicated_table_parent_znode_cleanup.sh
Executable file
@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env bash
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
db="rdb_$CLICKHOUSE_DATABASE"
|
||||
|
||||
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -nq "
|
||||
create database $db engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1');
|
||||
create table $db.a (x Int8) engine ReplicatedMergeTree order by x;"
|
||||
uuid=`$CLICKHOUSE_CLIENT -q "select uuid from system.tables where database = '$db' and name = 'a'"`
|
||||
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -nq "
|
||||
select count() from system.zookeeper where path = '/clickhouse/tables' and name = '$uuid';
|
||||
drop table $db.a sync;
|
||||
select count() from system.zookeeper where path = '/clickhouse/tables' and name = '$uuid';"
|
||||
$CLICKHOUSE_CLIENT -q "drop database $db"
|
Loading…
Reference in New Issue
Block a user