Compare commits

...

37 Commits

Author SHA1 Message Date
Michael Kolupaev
47c6132d4e
Merge 9102a6f119 into d793e06860 2024-09-16 21:26:55 +08:00
Rich Raposa
d793e06860
Merge pull request #69622 from Olexandr88/patch-1
Docs: Update index
2024-09-16 13:12:30 +00:00
vdimir
1986fb1418
Merge pull request #68595 from ClickHouse/vdimir/fix_function_printf_style
Fix style in Functions/printf.cpp
2024-09-16 12:34:31 +00:00
Mikhail f. Shiryaev
f36408a666
Merge pull request #69599 from ClickHouse/local-debug
Improve debug step in actions
2024-09-16 10:20:12 +00:00
Yarik Briukhovetskyi
de85f5f251
empty commit (I've changed the changelog entry) 2024-09-16 12:15:11 +02:00
Oleksandr
85af661b9c
Docs: Update index 2024-09-16 12:57:24 +03:00
Antonio Andelic
b42c6491e4
Merge pull request #69578 from ClickHouse/issues/68932/enable_in_ci
enable removeRecursive in CI
2024-09-16 09:03:11 +00:00
Robert Schulze
1a4c7b7c61
Merge pull request #69493 from ucasfl/vector-index-insert
Speedup insert performance of vector similarity index by parallelization
2024-09-16 08:54:02 +00:00
Oleksandr
14feba8443
Docs: Update index 2024-09-16 11:42:09 +03:00
vdimir
4c4a051d5e
Merge pull request #69075 from kirillgarbar/remove-stale-moving-parts
Remove stale moving parts without zookeeper
2024-09-16 08:02:05 +00:00
Vitaly Baranov
a55cc03973
Merge pull request #69611 from vitlibar/masking-sensitive-info-in-gcs-table-function
Masking sensitive info in gcs() table function
2024-09-16 07:58:17 +00:00
Robert Schulze
37411bf240
Fix sizing with unconstrained thread pool size 2024-09-15 15:06:14 +00:00
Michael Kolupaev
9102a6f119 Merge remote-tracking branch 'origin/master' into zznode 2024-09-14 00:57:56 +00:00
Vitaly Baranov
a461d20af9 Masking sensitive info in gcs() table function. 2024-09-13 23:03:56 +02:00
Michael Kolupaev
59d1f9a6b1 Fix test 2024-09-13 20:04:28 +00:00
Mikhail f. Shiryaev
b55d0b54ea
Merge steps together to minimize grouping 2024-09-13 17:35:09 +02:00
Mikhail f. Shiryaev
418ef3f8bc
Use local debug action in every action 2024-09-13 17:20:49 +02:00
Mikhail f. Shiryaev
b420bbf855
Improve debug action 2024-09-13 17:17:10 +02:00
Кирилл Гарбар
6a7cfd13f7 Set PRESERVE_BLOBS if part is fetched from another replica 2024-09-13 15:25:17 +03:00
Mikhail Artemenko
baf6aaef1d fix tests 2024-09-13 11:32:33 +00:00
Robert Schulze
9ca149a487
Fix GWP-asan crash 2024-09-13 11:07:09 +00:00
Mikhail Artemenko
042194e3f6 enable removeRecursive in CI 2024-09-13 08:50:28 +00:00
Michael Kolupaev
adb905a692 Small improvement 2024-09-13 04:12:28 +00:00
Michael Kolupaev
e6ec9eaad3 Conflict 2024-09-13 02:18:24 +00:00
Michael Kolupaev
9a3adc70bd Don't leave an empty znode when replicated table is dropped 2024-09-12 21:20:25 +00:00
Кирилл Гарбар
120e38c72a Merge remote-tracking branch 'kirillgarbar/master' into remove-stale-moving-parts 2024-09-12 19:26:58 +03:00
Robert Schulze
38b5ea9066
Fix docs 2024-09-12 12:43:27 +00:00
Robert Schulze
fe5e061fff
Some fixups 2024-09-12 10:38:14 +00:00
flynn
f6b965872f Merge branch 'master' of github.com:ClickHouse/ClickHouse into vector-index-insert 2024-09-12 06:40:33 +00:00
flynn
22c3b71196 Make vector similarity index creation thread pool globally 2024-09-12 03:54:25 +00:00
flynn
7425d4aa1a remove blank line 2024-09-11 10:12:42 +00:00
flynn
cf12e3924f Speedup insert data with vector similarity index by add data to index parallel 2024-09-11 09:31:46 +00:00
vdimir
cfc931160d
Merge branch 'master' into vdimir/fix_function_printf_style 2024-09-02 16:05:02 +02:00
Кирилл Гарбар
b2c4b771d8 Minor fixes 2024-08-29 19:33:04 +03:00
Кирилл Гарбар
edf4e09fb2 Remove stale moving parts without zookeeper 2024-08-29 18:46:06 +03:00
vdimir
07f44fdb89
Merge branch 'master' into vdimir/fix_function_printf_style 2024-08-22 11:23:50 +02:00
vdimir
2fcbe2465a
Fix style in Functions/printf.cpp 2024-08-20 09:07:15 +00:00
45 changed files with 669 additions and 259 deletions

View File

@ -4,15 +4,31 @@ description: Prints workflow debug info
runs:
using: "composite"
steps:
- name: Print envs
- name: Envs, event.json and contexts
shell: bash
run: |
echo "::group::Envs"
env
echo "::endgroup::"
- name: Print Event.json
shell: bash
run: |
echo "::group::Event.json"
echo '::group::Environment variables'
env | sort
echo '::endgroup::'
echo '::group::event.json'
python3 -m json.tool "$GITHUB_EVENT_PATH"
echo "::endgroup::"
echo '::endgroup::'
cat << 'EOF'
::group::github context
${{ toJSON(github) }}
::endgroup::
::group::env context
${{ toJSON(env) }}
::endgroup::
::group::runner context
${{ toJSON(runner) }}
::endgroup::
::group::job context
${{ toJSON(job) }}
::endgroup::
EOF

View File

@ -27,6 +27,8 @@ jobs:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Labels check
run: |
cd "$GITHUB_WORKSPACE/tests/ci"

View File

@ -33,6 +33,8 @@ jobs:
clear-repository: true
token: ${{secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN}}
fetch-depth: 0
- name: Debug Info
uses: ./.github/actions/debug
- name: Cherry pick
run: |
cd "$GITHUB_WORKSPACE/tests/ci"

View File

@ -56,13 +56,13 @@ jobs:
GH_TOKEN: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }}
runs-on: [self-hosted, release-maker]
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
token: ${{secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN}}
fetch-depth: 0
- name: Debug Info
uses: ./.github/actions/debug
- name: Prepare Release Info
shell: bash
run: |

View File

@ -11,6 +11,7 @@ name: Build docker images
required: false
type: boolean
default: false
jobs:
DockerBuildAarch64:
runs-on: [self-hosted, style-checker-aarch64]

View File

@ -8,20 +8,21 @@ on: # yamllint disable-line rule:truthy
schedule:
- cron: '0 */6 * * *'
workflow_dispatch:
jobs:
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: PrepareRunConfig
id: runconfig
run: |

View File

@ -15,14 +15,14 @@ jobs:
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Merge sync PR
run: |
cd "$GITHUB_WORKSPACE/tests/ci"

View File

@ -14,14 +14,14 @@ jobs:
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get a version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Cancel PR workflow
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --cancel-previous-run

View File

@ -15,14 +15,14 @@ jobs:
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: PrepareRunConfig
id: runconfig
run: |

View File

@ -25,14 +25,14 @@ jobs:
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get a version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Cancel previous Sync PR workflow
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --cancel-previous-run

View File

@ -24,6 +24,8 @@ jobs:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Labels check
run: |
cd "$GITHUB_WORKSPACE/tests/ci"

View File

@ -62,8 +62,6 @@ jobs:
env:
GITHUB_JOB_OVERRIDDEN: ${{inputs.test_name}}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
@ -72,6 +70,8 @@ jobs:
submodules: ${{inputs.submodules}}
fetch-depth: ${{inputs.checkout_depth}}
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Set build envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'

View File

@ -13,16 +13,17 @@ Here is a complete list of available database engines. Follow the links for more
- [Atomic](../../engines/database-engines/atomic.md)
- [MySQL](../../engines/database-engines/mysql.md)
- [Lazy](../../engines/database-engines/lazy.md)
- [MaterializedPostgreSQL](../../engines/database-engines/materialized-postgresql.md)
- [MaterializedMySQL](../../engines/database-engines/materialized-mysql.md)
- [Lazy](../../engines/database-engines/lazy.md)
- [MySQL](../../engines/database-engines/mysql.md)
- [PostgreSQL](../../engines/database-engines/postgresql.md)
- [MaterializedPostgreSQL](../../engines/database-engines/materialized-postgresql.md)
- [Replicated](../../engines/database-engines/replicated.md)
- [SQLite](../../engines/database-engines/sqlite.md)

View File

@ -107,6 +107,10 @@ The vector similarity index currently does not work with per-table, non-default
[here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml.
:::
Vector index creation is known to be slow. To speed the process up, index creation can be parallelized. The maximum number of threads can be
configured using server configuration
setting [max_build_vector_similarity_index_thread_pool_size](../../../operations/server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size).
ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary
tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write
requests.

View File

@ -491,6 +491,14 @@ Type: Double
Default: 0.9
## max_build_vector_similarity_index_thread_pool_size {#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size}
The maximum number of threads to use for building vector indexes. 0 means all cores.
Type: UInt64
Default: 16
## cgroups_memory_usage_observer_wait_time
Interval in seconds during which the server's maximum allowed memory consumption is adjusted by the corresponding threshold in cgroups. (see

View File

@ -178,6 +178,9 @@
M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \
M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \
M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \
M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \
M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \
M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \
\
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \

View File

@ -63,6 +63,7 @@ static struct InitFiu
REGULAR(keepermap_fail_drop_data) \
REGULAR(lazy_pipe_fds_fail_close) \
PAUSEABLE(infinite_sleep) \
PAUSEABLE(stop_moving_part_before_swap_with_active) \
namespace FailPoints

View File

@ -1723,11 +1723,10 @@ std::string normalizeZooKeeperPath(std::string zookeeper_path, bool check_starts
String extractZooKeeperName(const String & path)
{
static constexpr auto default_zookeeper_name = "default";
if (path.empty())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path should not be empty");
if (path[0] == '/')
return default_zookeeper_name;
return String(DEFAULT_ZOOKEEPER_NAME);
auto pos = path.find(":/");
if (pos != String::npos && pos < path.find('/'))
{
@ -1736,7 +1735,7 @@ String extractZooKeeperName(const String & path)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Zookeeper path should start with '/' or '<auxiliary_zookeeper_name>:/'");
return zookeeper_name;
}
return default_zookeeper_name;
return String(DEFAULT_ZOOKEEPER_NAME);
}
String extractZooKeeperPath(const String & path, bool check_starts_with_slash, LoggerPtr log)

View File

@ -47,6 +47,10 @@ namespace zkutil
/// Preferred size of multi command (in the number of operations)
constexpr size_t MULTI_BATCH_SIZE = 100;
/// Path "default:/foo" refers to znode "/foo" in the default zookeeper,
/// path "other:/foo" refers to znode "/foo" in auxiliary zookeeper named "other".
constexpr std::string_view DEFAULT_ZOOKEEPER_NAME = "default";
struct ShuffleHost
{
enum AvailabilityZoneInfo

View File

@ -50,7 +50,7 @@ namespace DB
M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \
M(String, default_database, "default", "Default database name.", 0) \
M(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting.", 0) \
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \
M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \
@ -65,6 +65,7 @@ namespace DB
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \
M(UInt64, max_build_vector_similarity_index_thread_pool_size, 16, "The maximum number of threads to use to build vector similarity indexes. 0 means all cores.", 0) \
\
/* Database Catalog */ \
M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \

View File

@ -50,13 +50,6 @@ private:
return executeNonconstant(input);
}
[[maybe_unused]] String toString() const
{
WriteBufferFromOwnString buf;
buf << "format:" << format << ", rows:" << rows << ", is_literal:" << is_literal << ", input:" << input.dumpStructure() << "\n";
return buf.str();
}
private:
ColumnWithTypeAndName executeLiteral(std::string_view literal) const
{
@ -231,9 +224,7 @@ public:
const auto & instruction = instructions[i];
try
{
// std::cout << "instruction[" << i << "]:" << instructions[i].toString() << std::endl;
concat_args[i] = instruction.execute();
// std::cout << "concat_args[" << i << "]:" << concat_args[i].dumpStructure() << std::endl;
}
catch (const fmt::v9::format_error & e)
{
@ -358,7 +349,14 @@ private:
REGISTER_FUNCTION(Printf)
{
factory.registerFunction<FunctionPrintf>();
factory.registerFunction<FunctionPrintf>(
FunctionDocumentation{.description=R"(
The `printf` function formats the given string with the values (strings, integers, floating-points etc.) listed in the arguments, similar to printf function in C++.
The format string can contain format specifiers starting with `%` character.
Anything not contained in `%` and the following format specifier is considered literal text and copied verbatim into the output.
Literal `%` character can be escaped by `%%`.)", .examples{{"sum", "select printf('%%%s %s %d', 'Hello', 'World', 2024);", "%Hello World 2024"}}, .categories{"String"}
});
}
}

View File

@ -10,6 +10,7 @@
#include <Common/SensitiveDataMasker.h>
#include <Common/Macros.h>
#include <Common/EventNotifier.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/Throttler.h>
@ -121,7 +122,6 @@
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <base/defines.h>
namespace fs = std::filesystem;
namespace ProfileEvents
@ -164,6 +164,9 @@ namespace CurrentMetrics
extern const Metric TablesLoaderForegroundThreadsActive;
extern const Metric TablesLoaderForegroundThreadsScheduled;
extern const Metric IOWriterThreadsScheduled;
extern const Metric BuildVectorSimilarityIndexThreads;
extern const Metric BuildVectorSimilarityIndexThreadsActive;
extern const Metric BuildVectorSimilarityIndexThreadsScheduled;
extern const Metric AttachedTable;
extern const Metric AttachedView;
extern const Metric AttachedDictionary;
@ -297,6 +300,8 @@ struct ContextSharedPart : boost::noncopyable
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable OnceFlag prefetch_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable OnceFlag build_vector_similarity_index_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation.
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
@ -3297,6 +3302,21 @@ size_t Context::getPrefetchThreadpoolSize() const
return config.getUInt(".prefetch_threadpool_pool_size", 100);
}
ThreadPool & Context::getBuildVectorSimilarityIndexThreadPool() const
{
callOnce(shared->build_vector_similarity_index_threadpool_initialized, [&] {
size_t pool_size = shared->server_settings.max_build_vector_similarity_index_thread_pool_size > 0
? shared->server_settings.max_build_vector_similarity_index_thread_pool_size
: getNumberOfPhysicalCPUCores();
shared->build_vector_similarity_index_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::BuildVectorSimilarityIndexThreads,
CurrentMetrics::BuildVectorSimilarityIndexThreadsActive,
CurrentMetrics::BuildVectorSimilarityIndexThreadsScheduled,
pool_size);
});
return *shared->build_vector_similarity_index_threadpool;
}
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
{
callOnce(shared->buffer_flush_schedule_pool_initialized, [&] {
@ -3743,6 +3763,11 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
return zookeeper->second;
}
std::shared_ptr<zkutil::ZooKeeper> Context::getDefaultOrAuxiliaryZooKeeper(const String & name) const
{
return name == zkutil::DEFAULT_ZOOKEEPER_NAME ? getZooKeeper() : getAuxiliaryZooKeeper(name);
}
std::map<String, zkutil::ZooKeeperPtr> Context::getAuxiliaryZooKeepers() const
{

View File

@ -1004,6 +1004,8 @@ public:
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
/// If name == "default", same as getZooKeeper(), otherwise same as getAuxiliaryZooKeeper().
std::shared_ptr<zkutil::ZooKeeper> getDefaultOrAuxiliaryZooKeeper(const String & name) const;
/// return Auxiliary Zookeeper map
std::map<String, zkutil::ZooKeeperPtr> getAuxiliaryZooKeepers() const;
@ -1097,6 +1099,8 @@ public:
/// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const;
ThreadPool & getBuildVectorSimilarityIndexThreadPool() const;
/// Settings for MergeTree background tasks stored in config.xml
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;

View File

@ -1469,7 +1469,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
if (!getContext()->getSettingsRef().allow_experimental_refreshable_materialized_view)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Refreshable materialized views are experimental. Enable allow_experimental_refreshable_materialized_view to use.");
"Refreshable materialized views are experimental. Enable allow_experimental_refreshable_materialized_view to use");
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
visitor.visit(*create.refresh_strategy);

View File

@ -1005,7 +1005,7 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
{
ReplicatedTableStatus status;
storage_replicated->getStatus(status);
if (status.zookeeper_path == query.replica_zk_path)
if (status.zookeeper_info.path == query.replica_zk_path)
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED,
"There is a local table {}, which has the same table path in ZooKeeper. "
"Please check the path in query. "
@ -1028,7 +1028,10 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
if (zookeeper->exists(remote_replica_path + "/is_active"))
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't remove replica: {}, because it's active", query.replica);
StorageReplicatedMergeTree::dropReplica(zookeeper, query.replica_zk_path, query.replica, log);
TableZnodeInfo info;
info.path = query.replica_zk_path;
info.replica_name = query.replica;
StorageReplicatedMergeTree::dropReplica(zookeeper, info, log);
LOG_INFO(log, "Dropped replica {}", remote_replica_path);
}
else
@ -1045,12 +1048,12 @@ bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const Stora
storage_replicated->getStatus(status);
/// Do not allow to drop local replicas and active remote replicas
if (query.replica == status.replica_name)
if (query.replica == status.zookeeper_info.replica_name)
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED,
"We can't drop local replica, please use `DROP TABLE` if you want "
"to clean the data and drop this replica");
storage_replicated->dropReplica(status.zookeeper_path, query.replica, log);
storage_replicated->dropReplica(query.replica, log);
LOG_TRACE(log, "Dropped replica {} of {}", query.replica, table->getStorageID().getNameForLogs());
return true;

View File

@ -74,7 +74,8 @@ private:
findMySQLFunctionSecretArguments();
}
else if ((function.name == "s3") || (function.name == "cosn") || (function.name == "oss") ||
(function.name == "deltaLake") || (function.name == "hudi") || (function.name == "iceberg"))
(function.name == "deltaLake") || (function.name == "hudi") || (function.name == "iceberg") ||
(function.name == "gcs"))
{
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...)
findS3FunctionSecretArguments(/* is_cluster_function= */ false);

View File

@ -5,9 +5,11 @@
#include <Columns/ColumnArray.h>
#include <Common/BitHelpers.h>
#include <Common/formatReadable.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Core/Field.h>
#include <Core/ServerSettings.h>
#include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -29,7 +31,6 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int FORMAT_VERSION_TOO_OLD;
extern const int ILLEGAL_COLUMN;
extern const int INCORRECT_DATA;
@ -131,8 +132,7 @@ void USearchIndexWithSerialization::deserialize(ReadBuffer & istr)
/// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release()));
if (!try_reserve(limits()))
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for usearch index");
try_reserve(limits());
}
USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const
@ -270,20 +270,49 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length");
/// Reserving space is mandatory
if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)))
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index");
size_t max_thread_pool_size = Context::getGlobalContextInstance()->getServerSettings().max_build_vector_similarity_index_thread_pool_size;
if (max_thread_pool_size == 0)
max_thread_pool_size = getNumberOfPhysicalCPUCores();
unum::usearch::index_limits_t limits(roundUpToPowerOfTwoOrZero(index->size() + rows), max_thread_pool_size);
index->reserve(limits);
for (size_t row = 0; row < rows; ++row)
/// Vector index creation is slooooow. Add the new rows in parallel. The threadpool is global to avoid oversubscription when multiple
/// indexes are build simultaneously (e.g. multiple merges run at the same time).
auto & thread_pool = Context::getGlobalContextInstance()->getBuildVectorSimilarityIndexThreadPool();
auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group)
{
if (auto result = index->add(static_cast<USearchIndex::vector_key_t>(index->size()), &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
/// add is thread-safe
if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release()));
}
else
{
ProfileEvents::increment(ProfileEvents::USearchAddCount);
ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members);
ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances);
}
};
size_t index_size = index->size();
for (size_t row = 0; row < rows; ++row)
{
auto key = static_cast<USearchIndex::vector_key_t>(index_size + row);
auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); };
thread_pool.scheduleOrThrowOnError(task);
}
thread_pool.wait();
}
}

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Common/FailPoint.h>
#include <Common/logger_useful.h>
#include <set>
@ -15,6 +16,11 @@ namespace ErrorCodes
extern const int DIRECTORY_ALREADY_EXISTS;
}
namespace FailPoints
{
extern const char stop_moving_part_before_swap_with_active[];
}
namespace
{
@ -226,6 +232,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name);
MutableDataPartStoragePtr cloned_part_storage;
bool preserve_blobs = false;
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
{
/// Try zero-copy replication and fallback to default copy if it's not possible
@ -253,6 +260,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
if (zero_copy_part)
{
/// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder
preserve_blobs = true;
zero_copy_part->is_temp = false; /// Do not remove it in dtor
cloned_part_storage = zero_copy_part->getDataPartStoragePtr();
}
@ -272,7 +280,17 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
cloned_part.part = std::move(builder).withPartFormatFromDisk().build();
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath());
cloned_part.part->is_temp = data->allowRemoveStaleMovingParts();
cloned_part.part->is_temp = false;
if (data->allowRemoveStaleMovingParts())
{
cloned_part.part->is_temp = true;
/// Setting it in case connection to zookeeper is lost while moving
/// Otherwise part might be stuck in the moving directory due to the KEEPER_EXCEPTION in part's destructor
if (preserve_blobs)
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::PRESERVE_BLOBS;
else
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
}
cloned_part.part->loadColumnsChecksumsIndexes(true, true);
cloned_part.part->loadVersionMetadata();
cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime();
@ -282,6 +300,8 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
{
/// Used to get some stuck parts in the moving directory by stopping moves while pause is active
FailPointInjection::pauseFailPoint(FailPoints::stop_moving_part_before_swap_with_active);
if (moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/TableZnodeInfo.h>
#include <Core/Types.h>
namespace DB
@ -16,9 +17,7 @@ struct ReplicatedTableStatus
ReplicatedMergeTreeQueue::Status queue;
UInt32 parts_to_check;
String zookeeper_name;
String zookeeper_path;
String replica_name;
TableZnodeInfo zookeeper_info;
String replica_path;
Int32 columns_version;
UInt64 log_max_index;

View File

@ -6,6 +6,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/TableZnodeInfo.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
@ -88,32 +89,23 @@ See details in documentation: https://clickhouse.com/docs/en/engines/table-engin
If you use the Replicated version of engines, see https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication/.
)";
static ColumnsDescription getColumnsDescriptionFromZookeeper(const String & raw_zookeeper_path, ContextMutablePtr context)
static ColumnsDescription getColumnsDescriptionFromZookeeper(const TableZnodeInfo & zookeeper_info, ContextMutablePtr context)
{
String zookeeper_name = zkutil::extractZooKeeperName(raw_zookeeper_path);
String zookeeper_path = zkutil::extractZooKeeperPath(raw_zookeeper_path, true);
if (!context->hasZooKeeper() && !context->hasAuxiliaryZooKeeper(zookeeper_name))
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure without zookeeper, you must specify the structure manually"};
zkutil::ZooKeeperPtr zookeeper;
try
{
if (zookeeper_name == StorageReplicatedMergeTree::getDefaultZooKeeperName())
zookeeper = context->getZooKeeper();
else
zookeeper = context->getAuxiliaryZooKeeper(zookeeper_name);
zookeeper = context->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
}
catch (...)
{
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure from zookeeper, because cannot get zookeeper: {}. You must specify structure manually", getCurrentExceptionMessage(false)};
}
if (!zookeeper->exists(zookeeper_path + "/replicas"))
if (!zookeeper->exists(zookeeper_info.path + "/replicas"))
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure, because there no other replicas in zookeeper. You must specify the structure manually"};
Coordination::Stat columns_stat;
return ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_path) / "columns", &columns_stat));
return ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_info.path) / "columns", &columns_stat));
}
/// Returns whether a new syntax is used to define a table engine, i.e. MergeTree() PRIMARY KEY ... PARTITION BY ... SETTINGS ...
@ -184,23 +176,16 @@ static std::string_view getNamePart(const String & engine_name)
/// Extracts zookeeper path and replica name from the table engine's arguments.
/// The function can modify those arguments (that's why they're passed separately in `engine_args`) and also determines RenamingRestrictions.
/// The function assumes the table engine is Replicated.
static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
static TableZnodeInfo extractZooKeeperPathAndReplicaNameFromEngineArgs(
const ASTCreateQuery & query,
const StorageID & table_id,
const String & engine_name,
ASTs & engine_args,
LoadingStrictnessLevel mode,
const ContextPtr & local_context,
String & zookeeper_path,
String & replica_name,
RenamingRestrictions & renaming_restrictions)
const ContextPtr & local_context)
{
chassert(isReplicated(engine_name));
zookeeper_path = "";
replica_name = "";
renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
bool is_extended_storage_def = isExtendedStorageDef(query);
if (is_extended_storage_def)
@ -210,62 +195,12 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
evaluateEngineArgs(engine_args, local_context);
}
bool is_on_cluster = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
/// and if UUID was explicitly passed in CREATE TABLE (like for ATTACH)
bool allow_uuid_macro = is_on_cluster || is_replicated_database || query.attach || query.has_uuid;
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name)
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name, String zookeeper_path, String replica_name) -> TableZnodeInfo
{
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
if (mode < LoadingStrictnessLevel::ATTACH)
{
Macros::MacroExpansionInfo info;
/// NOTE: it's not recursive
info.expand_special_macros_only = true;
info.table_id = table_id;
/// Avoid unfolding {uuid} macro on this step.
/// We did unfold it in previous versions to make moving table from Atomic to Ordinary database work correctly,
/// but now it's not allowed (and it was the only reason to unfold {uuid} macro).
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = local_context->getMacros()->expand(zookeeper_path, info);
info.level = 0;
replica_name = local_context->getMacros()->expand(replica_name, info);
}
ast_zk_path->value = zookeeper_path;
ast_replica_name->value = replica_name;
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
/// to make possible copying metadata files between replicas.
Macros::MacroExpansionInfo info;
info.table_id = table_id;
if (is_replicated_database)
{
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
info.shard = getReplicatedDatabaseShardName(database);
info.replica = getReplicatedDatabaseReplicaName(database);
}
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = local_context->getMacros()->expand(zookeeper_path, info);
info.level = 0;
info.table_id.uuid = UUIDHelpers::Nil;
replica_name = local_context->getMacros()->expand(replica_name, info);
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
/// or if one of these macros is recursively expanded from some other macro.
/// Also do not allow to move table from Atomic to Ordinary database if there's {uuid} macro
if (info.expanded_database || info.expanded_table)
renaming_restrictions = RenamingRestrictions::DO_NOT_ALLOW;
else if (info.expanded_uuid)
renaming_restrictions = RenamingRestrictions::ALLOW_PRESERVING_UUID;
TableZnodeInfo res = TableZnodeInfo::resolve(zookeeper_path, replica_name, table_id, query, mode, local_context);
ast_zk_path->value = res.full_path_for_metadata;
ast_replica_name->value = res.replica_name_for_metadata;
return res;
};
size_t arg_num = 0;
@ -277,6 +212,9 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
if (has_valid_arguments)
{
bool is_replicated_database = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -293,27 +231,22 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
/// Get path and name from engine arguments
auto * ast_zk_path = engine_args[arg_num]->as<ASTLiteral>();
if (ast_zk_path && ast_zk_path->value.getType() == Field::Types::String)
zookeeper_path = ast_zk_path->value.safeGet<String>();
else
if (!ast_zk_path || ast_zk_path->value.getType() != Field::Types::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path in ZooKeeper must be a string literal{}", verbose_help_message);
auto * ast_replica_name = engine_args[arg_num + 1]->as<ASTLiteral>();
if (ast_replica_name && ast_replica_name->value.getType() == Field::Types::String)
replica_name = ast_replica_name->value.safeGet<String>();
else
if (!ast_replica_name || ast_replica_name->value.getType() != Field::Types::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must be a string literal{}", verbose_help_message);
if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 2)
{
LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "Replacing user-provided ZooKeeper path and replica name ({}, {}) "
"with default arguments", zookeeper_path, replica_name);
engine_args[arg_num]->as<ASTLiteral>()->value = zookeeper_path = server_settings.default_replica_path;
engine_args[arg_num + 1]->as<ASTLiteral>()->value = replica_name = server_settings.default_replica_name;
"with default arguments", ast_zk_path->value.safeGet<String>(), ast_replica_name->value.safeGet<String>());
ast_zk_path->value = server_settings.default_replica_path;
ast_replica_name->value = server_settings.default_replica_name;
}
expand_macro(ast_zk_path, ast_replica_name);
return expand_macro(ast_zk_path, ast_replica_name, ast_zk_path->value.safeGet<String>(), ast_replica_name->value.safeGet<String>());
}
else if (is_extended_storage_def
&& (arg_cnt == 0
@ -322,24 +255,24 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
{
/// Try use default values if arguments are not specified.
/// Note: {uuid} macro works for ON CLUSTER queries when database engine is Atomic.
zookeeper_path = server_settings.default_replica_path;
/// TODO maybe use hostname if {replica} is not defined?
replica_name = server_settings.default_replica_name;
/// Modify query, so default values will be written to metadata
assert(arg_num == 0);
ASTs old_args;
std::swap(engine_args, old_args);
auto path_arg = std::make_shared<ASTLiteral>(zookeeper_path);
auto name_arg = std::make_shared<ASTLiteral>(replica_name);
auto path_arg = std::make_shared<ASTLiteral>("");
auto name_arg = std::make_shared<ASTLiteral>("");
auto * ast_zk_path = path_arg.get();
auto * ast_replica_name = name_arg.get();
expand_macro(ast_zk_path, ast_replica_name);
auto res = expand_macro(ast_zk_path, ast_replica_name, server_settings.default_replica_path, server_settings.default_replica_name);
engine_args.emplace_back(std::move(path_arg));
engine_args.emplace_back(std::move(name_arg));
std::move(std::begin(old_args), std::end(old_args), std::back_inserter(engine_args));
return res;
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected two string literal arguments: zookeeper_path and replica_name");
@ -363,15 +296,11 @@ std::optional<String> extractZooKeeperPathFromReplicatedTableDef(const ASTCreate
for (auto & engine_arg : engine_args)
engine_arg = engine_arg->clone();
LoadingStrictnessLevel mode = LoadingStrictnessLevel::CREATE;
String zookeeper_path;
String replica_name;
RenamingRestrictions renaming_restrictions;
try
{
extractZooKeeperPathAndReplicaNameFromEngineArgs(query, table_id, engine_name, engine_args, mode, local_context,
zookeeper_path, replica_name, renaming_restrictions);
auto res = extractZooKeeperPathAndReplicaNameFromEngineArgs(
query, table_id, engine_name, engine_args, LoadingStrictnessLevel::CREATE, local_context);
return res.full_path;
}
catch (Exception & e)
{
@ -382,8 +311,6 @@ std::optional<String> extractZooKeeperPathFromReplicatedTableDef(const ASTCreate
}
throw;
}
return zookeeper_path;
}
static StoragePtr create(const StorageFactory::Arguments & args)
@ -551,19 +478,17 @@ static StoragePtr create(const StorageFactory::Arguments & args)
}
/// Extract zookeeper path and replica name from engine arguments.
String zookeeper_path;
String replica_name;
RenamingRestrictions renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
TableZnodeInfo zookeeper_info;
if (replicated)
{
extractZooKeeperPathAndReplicaNameFromEngineArgs(args.query, args.table_id, args.engine_name, args.engine_args, args.mode,
args.getLocalContext(), zookeeper_path, replica_name, renaming_restrictions);
zookeeper_info = extractZooKeeperPathAndReplicaNameFromEngineArgs(
args.query, args.table_id, args.engine_name, args.engine_args, args.mode, args.getLocalContext());
if (replica_name.empty())
if (zookeeper_info.replica_name.empty())
throw Exception(ErrorCodes::NO_REPLICA_NAME_GIVEN, "No replica name in config{}", verbose_help_message);
// '\t' and '\n' will interrupt parsing 'source replica' in ReplicatedMergeTreeLogEntryData::readText
if (replica_name.find('\t') != String::npos || replica_name.find('\n') != String::npos)
if (zookeeper_info.replica_name.find('\t') != String::npos || zookeeper_info.replica_name.find('\n') != String::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must not contain '\\t' or '\\n'");
arg_cnt = engine_args.size(); /// Update `arg_cnt` here because extractZooKeeperPathAndReplicaNameFromEngineArgs() could add arguments.
@ -649,7 +574,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
ColumnsDescription columns;
if (args.columns.empty() && replicated)
columns = getColumnsDescriptionFromZookeeper(zookeeper_path, context);
columns = getColumnsDescriptionFromZookeeper(zookeeper_info, context);
else
columns = args.columns;
@ -879,8 +804,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
need_check_table_structure = txn->isInitialQuery();
return std::make_shared<StorageReplicatedMergeTree>(
zookeeper_path,
replica_name,
zookeeper_info,
args.mode,
args.table_id,
args.relative_data_path,
@ -889,7 +813,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
date_column_name,
merging_params,
std::move(storage_settings),
renaming_restrictions,
need_check_table_structure);
}
else

View File

@ -211,7 +211,6 @@ namespace ActionLocks
static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000;
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
const String StorageReplicatedMergeTree::default_zookeeper_name = "default";
void StorageReplicatedMergeTree::setZooKeeper()
{
@ -221,18 +220,9 @@ void StorageReplicatedMergeTree::setZooKeeper()
/// strange effects. So we always use only one session for all tables.
/// (excluding auxiliary zookeepers)
if (zookeeper_name == default_zookeeper_name)
{
auto new_keeper = getContext()->getZooKeeper();
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = new_keeper;
}
else
{
auto new_keeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = new_keeper;
}
auto new_keeper = getContext()->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = new_keeper;
}
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const
@ -263,7 +253,7 @@ String StorageReplicatedMergeTree::getEndpointName() const
{
const MergeTreeSettings & settings = getContext()->getReplicatedMergeTreeSettings();
if (settings.enable_the_endpoint_id_with_zookeeper_name_prefix)
return zookeeper_name + ":" + replica_path;
return zookeeper_info.zookeeper_name + ":" + replica_path;
return replica_path;
}
@ -294,8 +284,7 @@ static MergeTreePartInfo makeDummyDropRangeForMovePartitionOrAttachPartitionFrom
}
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
const TableZnodeInfo & zookeeper_info_,
LoadingStrictnessLevel mode,
const StorageID & table_id_,
const String & relative_data_path_,
@ -304,7 +293,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
RenamingRestrictions renaming_restrictions_,
bool need_check_structure)
: MergeTreeData(table_id_,
metadata_,
@ -315,11 +303,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
true, /// require_part_metadata
mode,
[this] (const std::string & name) { enqueuePartForCheck(name); })
, full_zookeeper_path(zookeeper_path_)
, zookeeper_name(zkutil::extractZooKeeperName(full_zookeeper_path))
, zookeeper_path(zkutil::extractZooKeeperPath(full_zookeeper_path, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, log.load()))
, replica_name(replica_name_)
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name_)
, zookeeper_info(zookeeper_info_)
, zookeeper_path(zookeeper_info.path)
, replica_name(zookeeper_info.replica_name)
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name)
, reader(*this)
, writer(*this)
, merger_mutator(*this)
@ -331,7 +318,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, part_check_thread(*this)
, restarting_thread(*this)
, part_moves_between_shards_orchestrator(*this)
, renaming_restrictions(renaming_restrictions_)
, replicated_fetches_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_fetches_network_bandwidth, getContext()->getReplicatedFetchesThrottler()))
, replicated_sends_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_sends_network_bandwidth, getContext()->getReplicatedSendsThrottler()))
{
@ -365,7 +351,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
/// Will be activated by restarting thread.
mutations_finalizing_task->deactivate();
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_name);
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
if (has_zookeeper)
{
/// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't
@ -845,7 +831,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
else
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zookeeper);
if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, log.load()))
if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_info, metadata_drop_lock, log.load()))
{
/// Someone is recursively removing table right now, we cannot create new table until old one is removed
continue;
@ -1107,11 +1093,7 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperIfTableShutDown() const
{
zkutil::ZooKeeperPtr maybe_new_zookeeper;
if (zookeeper_name == default_zookeeper_name)
maybe_new_zookeeper = getContext()->getZooKeeper();
else
maybe_new_zookeeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
zkutil::ZooKeeperPtr maybe_new_zookeeper = getContext()->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
maybe_new_zookeeper->sync(zookeeper_path);
return maybe_new_zookeeper;
}
@ -1226,7 +1208,7 @@ void StorageReplicatedMergeTree::drop()
LOG_INFO(log, "Dropping table with non-zero lost_part_count equal to {}", lost_part_count);
}
bool last_replica_dropped = dropReplica(zookeeper, zookeeper_path, replica_name, log.load(), getSettings(), &has_metadata_in_zookeeper);
bool last_replica_dropped = dropReplica(zookeeper, zookeeper_info, log.load(), getSettings(), &has_metadata_in_zookeeper);
if (last_replica_dropped)
{
dropZookeeperZeroCopyLockPaths(zookeeper, zero_copy_locks_paths, log.load());
@ -1235,13 +1217,15 @@ void StorageReplicatedMergeTree::drop()
}
bool StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
LoggerPtr logger, MergeTreeSettingsPtr table_settings, std::optional<bool> * has_metadata_out)
bool StorageReplicatedMergeTree::dropReplica(
zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info, LoggerPtr logger,
MergeTreeSettingsPtr table_settings, std::optional<bool> * has_metadata_out)
{
if (zookeeper->expired())
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Table was not dropped because ZooKeeper session has expired.");
auto remote_replica_path = zookeeper_path + "/replicas/" + replica;
const String & zookeeper_path = zookeeper_info.path;
auto remote_replica_path = zookeeper_path + "/replicas/" + zookeeper_info.replica_name;
LOG_INFO(logger, "Removing replica {}, marking it as lost", remote_replica_path);
/// Mark itself lost before removing, because the following recursive removal may fail
@ -1352,30 +1336,33 @@ bool StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zookeeper);
LOG_INFO(logger, "Removing table {} (this might take several minutes)", zookeeper_path);
removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, logger);
removeTableNodesFromZooKeeper(zookeeper, zookeeper_info, metadata_drop_lock, logger);
}
return true;
}
bool StorageReplicatedMergeTree::dropReplica(const String & drop_zookeeper_path, const String & drop_replica, LoggerPtr logger)
bool StorageReplicatedMergeTree::dropReplica(const String & drop_replica, LoggerPtr logger)
{
zkutil::ZooKeeperPtr zookeeper = getZooKeeperIfTableShutDown();
/// NOTE it's not atomic: replica may become active after this check, but before dropReplica(...)
/// However, the main use case is to drop dead replica, which cannot become active.
/// This check prevents only from accidental drop of some other replica.
if (zookeeper->exists(drop_zookeeper_path + "/replicas/" + drop_replica + "/is_active"))
if (zookeeper->exists(zookeeper_info.path + "/replicas/" + drop_replica + "/is_active"))
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't drop replica: {}, because it's active", drop_replica);
return dropReplica(zookeeper, drop_zookeeper_path, drop_replica, logger);
TableZnodeInfo info = zookeeper_info;
info.replica_name = drop_replica;
return dropReplica(zookeeper, info, logger);
}
bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper,
const String & zookeeper_path, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger)
const TableZnodeInfo & zookeeper_info2, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger)
{
const String & zookeeper_path = zookeeper_info2.path;
bool completely_removed = false;
/// NOTE /block_numbers/ actually is not flat, because /block_numbers/<partition_id>/ may have ephemeral children,
@ -1443,6 +1430,15 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
metadata_drop_lock->setAlreadyRemoved();
completely_removed = true;
LOG_INFO(logger, "Table {} was successfully removed from ZooKeeper", zookeeper_path);
try
{
zookeeper_info2.dropAncestorZnodesIfNeeded(zookeeper);
}
catch (...)
{
LOG_WARNING(logger, "Failed to drop ancestor znodes {} - {} after dropping table: {}", zookeeper_info2.path_prefix_for_drop, zookeeper_info2.path, getCurrentExceptionMessage(false));
}
}
return completely_removed;
@ -2295,7 +2291,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
String source_replica_path = fs::path(zookeeper_path) / "replicas" / replica;
if (!fetchPart(part_name,
metadata_snapshot,
zookeeper_name,
zookeeper_info.zookeeper_name,
source_replica_path,
/* to_detached= */ false,
entry.quorum,
@ -2858,7 +2854,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
interserver_scheme, address.scheme, address.host);
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), part_desc->found_new_part_name, zookeeper_name, source_replica_path,
metadata_snapshot, getContext(), part_desc->found_new_part_name, zookeeper_info.zookeeper_name, source_replica_path,
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(),
interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_");
part_desc->res_part = fetched_part;
@ -2980,7 +2976,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
interserver_scheme, address.scheme, address.host);
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), entry.new_part_name, zookeeper_name, source_replica_path,
metadata_snapshot, getContext(), entry.new_part_name, zookeeper_info.zookeeper_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme,
replicated_fetches_throttler, true);
@ -5076,7 +5072,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
currently_fetching_parts.erase(part_name);
});
LOG_DEBUG(log, "Fetching already known part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
LOG_DEBUG(log, "Fetching already known part {} from {}:{}", part_name, zookeeper_info.zookeeper_name, source_replica_path);
TableLockHolder table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
@ -5109,7 +5105,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
"'{}' != '{}', can't fetch part from {}", interserver_scheme, address.scheme, address.host);
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), part_name, zookeeper_name, source_replica_path,
metadata_snapshot, getContext(), part_name, zookeeper_info.zookeeper_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(),
interserver_scheme, replicated_fetches_throttler, false, "", nullptr, true,
@ -5148,7 +5144,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_info.zookeeper_name, source_replica_path);
return part;
}
@ -6653,10 +6649,10 @@ void StorageReplicatedMergeTree::checkTableCanBeDropped(ContextPtr query_context
void StorageReplicatedMergeTree::checkTableCanBeRenamed(const StorageID & new_name) const
{
if (renaming_restrictions == RenamingRestrictions::ALLOW_ANY)
if (zookeeper_info.renaming_restrictions == RenamingRestrictions::ALLOW_ANY)
return;
if (renaming_restrictions == RenamingRestrictions::DO_NOT_ALLOW)
if (zookeeper_info.renaming_restrictions == RenamingRestrictions::DO_NOT_ALLOW)
{
auto old_name = getStorageID();
bool is_server_startup = Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER
@ -6680,7 +6676,7 @@ void StorageReplicatedMergeTree::checkTableCanBeRenamed(const StorageID & new_na
"If you really want to rename table, you should edit metadata file first and restart server or reattach the table.");
}
assert(renaming_restrictions == RenamingRestrictions::ALLOW_PRESERVING_UUID);
assert(zookeeper_info.renaming_restrictions == RenamingRestrictions::ALLOW_PRESERVING_UUID);
if (!new_name.hasUUID() && getStorageID().hasUUID())
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Cannot move Replicated table to Ordinary database, because zookeeper_path contains implicit "
@ -7039,9 +7035,7 @@ void StorageReplicatedMergeTree::getStatus(ReplicatedTableStatus & res, bool wit
/// NOTE: consider convert to UInt64
res.parts_to_check = static_cast<UInt32>(part_check_thread.size());
res.zookeeper_name = zookeeper_name;
res.zookeeper_path = zookeeper_path;
res.replica_name = replica_name;
res.zookeeper_info = zookeeper_info;
res.replica_path = replica_path;
res.columns_version = -1;
@ -7250,11 +7244,7 @@ void StorageReplicatedMergeTree::fetchPartition(
}
}
zkutil::ZooKeeperPtr zookeeper;
if (from_zookeeper_name != default_zookeeper_name)
zookeeper = getContext()->getAuxiliaryZooKeeper(from_zookeeper_name);
else
zookeeper = getZooKeeper();
zkutil::ZooKeeperPtr zookeeper = getContext()->getDefaultOrAuxiliaryZooKeeper(from_zookeeper_name);
if (from.back() == '/')
from.resize(from.size() - 1);
@ -10540,7 +10530,7 @@ void StorageReplicatedMergeTree::backupData(
auto coordination = backup_entries_collector.getBackupCoordination();
coordination->addReplicatedDataPath(full_zookeeper_path, data_path_in_backup);
coordination->addReplicatedDataPath(zookeeper_info.full_path, data_path_in_backup);
using PartNameAndChecksum = IBackupCoordination::PartNameAndChecksum;
std::vector<PartNameAndChecksum> part_names_with_hashes;
@ -10549,7 +10539,7 @@ void StorageReplicatedMergeTree::backupData(
part_names_with_hashes.emplace_back(PartNameAndChecksum{part_backup_entries.part_name, part_backup_entries.part_checksum});
/// Send our list of part names to the coordination (to compare with other replicas).
coordination->addReplicatedPartNames(full_zookeeper_path, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
coordination->addReplicatedPartNames(zookeeper_info.full_path, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
/// Send a list of mutations to the coordination too (we need to find the mutations which are not finished for added part names).
{
@ -10591,13 +10581,13 @@ void StorageReplicatedMergeTree::backupData(
}
if (!mutation_infos.empty())
coordination->addReplicatedMutations(full_zookeeper_path, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
coordination->addReplicatedMutations(zookeeper_info.full_path, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
}
}
/// This task will be executed after all replicas have collected their parts and the coordination is ready to
/// give us the final list of parts to add to the BackupEntriesCollector.
auto post_collecting_task = [my_full_zookeeper_path = full_zookeeper_path,
auto post_collecting_task = [my_full_zookeeper_path = zookeeper_info.full_path,
my_replica_name = getReplicaName(),
coordination,
my_parts_backup_entries = std::move(parts_backup_entries),
@ -10636,7 +10626,7 @@ void StorageReplicatedMergeTree::backupData(
void StorageReplicatedMergeTree::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions)
{
if (!restorer.getRestoreCoordination()->acquireInsertingDataIntoReplicatedTable(full_zookeeper_path))
if (!restorer.getRestoreCoordination()->acquireInsertingDataIntoReplicatedTable(zookeeper_info.full_path))
{
/// Other replica is already restoring the data of this table.
/// We'll get them later due to replication, it's not necessary to read it from the backup.

View File

@ -27,6 +27,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ReplicatedTableStatus.h>
#include <Storages/RenamingRestrictions.h>
#include <Storages/TableZnodeInfo.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/PartLog.h>
@ -98,8 +99,7 @@ public:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/
StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
const TableZnodeInfo & zookeeper_info_,
LoadingStrictnessLevel mode,
const StorageID & table_id_,
const String & relative_data_path_,
@ -108,7 +108,6 @@ public:
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
RenamingRestrictions renaming_restrictions_,
bool need_check_structure);
void startup() override;
@ -244,14 +243,15 @@ public:
/** Remove a specific replica from zookeeper.
* returns true if there are no replicas left
*/
static bool dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
static bool dropReplica(zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info,
LoggerPtr logger, MergeTreeSettingsPtr table_settings = nullptr, std::optional<bool> * has_metadata_out = nullptr);
bool dropReplica(const String & drop_zookeeper_path, const String & drop_replica, LoggerPtr logger);
bool dropReplica(const String & drop_replica, LoggerPtr logger);
/// Removes table from ZooKeeper after the last replica was dropped
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger);
static bool removeTableNodesFromZooKeeper(
zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info2,
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger);
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
@ -330,17 +330,15 @@ public:
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
// Return default or custom zookeeper name for table
const String & getZooKeeperName() const { return zookeeper_name; }
const String & getZooKeeperPath() const { return zookeeper_path; }
const String & getFullZooKeeperPath() const { return full_zookeeper_path; }
const String & getZooKeeperName() const { return zookeeper_info.zookeeper_name; }
const String & getZooKeeperPath() const { return zookeeper_info.path; }
const String & getFullZooKeeperPath() const { return zookeeper_info.full_path; }
// Return table id, common for different replicas
String getTableSharedID() const override;
std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const override;
static const String & getDefaultZooKeeperName() { return default_zookeeper_name; }
/// Check if there are new broken disks and enqueue part recovery tasks.
void checkBrokenDisks();
@ -418,12 +416,10 @@ private:
bool is_readonly_metric_set = false;
const String full_zookeeper_path;
static const String default_zookeeper_name;
const String zookeeper_name;
const String zookeeper_path;
const TableZnodeInfo zookeeper_info;
const String zookeeper_path; // shorthand for zookeeper_info.path
const String replica_name;
const String replica_name; // shorthand for zookeeper_info.replica_name
const String replica_path;
/** /replicas/me/is_active.
@ -519,9 +515,6 @@ private:
/// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false;
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro
const RenamingRestrictions renaming_restrictions;
/// Throttlers used in DataPartsExchange to lower maximum fetch/sends
/// speed.
ThrottlerPtr replicated_fetches_throttler;

View File

@ -530,9 +530,9 @@ Chunk SystemReplicasSource::generate()
res_columns[col_num++]->insert(status.is_session_expired);
res_columns[col_num++]->insert(status.queue.future_parts);
res_columns[col_num++]->insert(status.parts_to_check);
res_columns[col_num++]->insert(status.zookeeper_name);
res_columns[col_num++]->insert(status.zookeeper_path);
res_columns[col_num++]->insert(status.replica_name);
res_columns[col_num++]->insert(status.zookeeper_info.zookeeper_name);
res_columns[col_num++]->insert(status.zookeeper_info.path);
res_columns[col_num++]->insert(status.zookeeper_info.replica_name);
res_columns[col_num++]->insert(status.replica_path);
res_columns[col_num++]->insert(status.columns_version);
res_columns[col_num++]->insert(status.queue.queue_size);

View File

@ -0,0 +1,135 @@
#include <Storages/TableZnodeInfo.h>
#include <Common/Macros.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Databases/DatabaseReplicatedHelpers.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/StorageID.h>
#include <Parsers/ASTCreateQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
TableZnodeInfo TableZnodeInfo::resolve(const String & requested_path, const String & requested_replica_name, const StorageID & table_id, const ASTCreateQuery & query, LoadingStrictnessLevel mode, const ContextPtr & context)
{
bool is_on_cluster = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
/// and if UUID was explicitly passed in CREATE TABLE (like for ATTACH)
bool allow_uuid_macro = is_on_cluster || is_replicated_database || query.attach || query.has_uuid;
TableZnodeInfo res;
res.full_path = requested_path;
res.replica_name = requested_replica_name;
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
if (mode < LoadingStrictnessLevel::ATTACH)
{
Macros::MacroExpansionInfo info;
/// NOTE: it's not recursive
info.expand_special_macros_only = true;
info.table_id = table_id;
/// Avoid unfolding {uuid} macro on this step.
/// We did unfold it in previous versions to make moving table from Atomic to Ordinary database work correctly,
/// but now it's not allowed (and it was the only reason to unfold {uuid} macro).
info.table_id.uuid = UUIDHelpers::Nil;
res.full_path = context->getMacros()->expand(res.full_path, info);
info.level = 0;
res.replica_name = context->getMacros()->expand(res.replica_name, info);
}
res.full_path_for_metadata = res.full_path;
res.replica_name_for_metadata = res.replica_name;
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
/// to make possible copying metadata files between replicas.
Macros::MacroExpansionInfo info;
info.table_id = table_id;
if (is_replicated_database)
{
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
info.shard = getReplicatedDatabaseShardName(database);
info.replica = getReplicatedDatabaseReplicaName(database);
}
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
res.full_path = context->getMacros()->expand(res.full_path, info);
bool expanded_uuid_in_path = info.expanded_uuid;
info.level = 0;
info.table_id.uuid = UUIDHelpers::Nil;
res.replica_name = context->getMacros()->expand(res.replica_name, info);
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
/// or if one of these macros is recursively expanded from some other macro.
/// Also do not allow to move table from Atomic to Ordinary database if there's {uuid} macro
if (info.expanded_database || info.expanded_table)
res.renaming_restrictions = RenamingRestrictions::DO_NOT_ALLOW;
else if (info.expanded_uuid)
res.renaming_restrictions = RenamingRestrictions::ALLOW_PRESERVING_UUID;
res.zookeeper_name = zkutil::extractZooKeeperName(res.full_path);
res.path = zkutil::extractZooKeeperPath(res.full_path, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, getLogger(table_id.getNameForLogs()));
res.path_prefix_for_drop = res.path;
if (expanded_uuid_in_path)
{
/// When dropping table with znode path "/foo/{uuid}/bar/baz", delete not only
/// "/foo/{uuid}/bar/baz" but also "/foo/{uuid}/bar" and "/foo/{uuid}" if they became empty.
///
/// (We find the uuid substring by searching instead of keeping track of it when expanding
/// the macro. So in principle we may find a uuid substring that wasn't expanded from a
/// macro. This should be ok because we're searching for the *last* occurrence, so we'll get
/// a prefix at least as long as the correct one, so we won't delete znodes outside the
/// {uuid} path component. This sounds sketchy, but propagating string indices through macro
/// expansion passes is sketchy too (error-prone and more complex), and on balance this seems
/// better.)
String uuid_str = toString(table_id.uuid);
size_t i = res.path.rfind(uuid_str);
if (i == String::npos)
/// Possible if the macro is in the "<auxiliary_zookeeper_name>:/" prefix, but we probably
/// don't want to allow that.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find uuid in zookeeper path after expanding {{uuid}} macro: {} (uuid {})", res.path, uuid_str);
i += uuid_str.size();
/// In case the path is "/foo/pika{uuid}chu/bar" (or "/foo/{uuid}{replica}/bar").
while (i < res.path.size() && res.path[i] != '/')
i += 1;
res.path_prefix_for_drop = res.path.substr(0, i);
}
return res;
}
void TableZnodeInfo::dropAncestorZnodesIfNeeded(const zkutil::ZooKeeperPtr & zookeeper) const
{
chassert(path.starts_with(path_prefix_for_drop));
if (path_prefix_for_drop.empty() || path_prefix_for_drop.size() == path.size())
return;
chassert(path[path_prefix_for_drop.size()] == '/');
String path_to_remove = path;
while (path_to_remove.size() > path_prefix_for_drop.size())
{
size_t i = path_to_remove.find_last_of('/');
chassert(i != String::npos && i >= path_prefix_for_drop.size());
path_to_remove = path_to_remove.substr(0, i);
Coordination::Error rc = zookeeper->tryRemove(path_to_remove);
if (rc != Coordination::Error::ZOK)
/// Znode not empty or already removed by someone else.
break;
}
}
}

View File

@ -0,0 +1,62 @@
#pragma once
#include <base/types.h>
#include <Storages/RenamingRestrictions.h>
#include <Databases/LoadingStrictnessLevel.h>
namespace zkutil
{
class ZooKeeper;
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
}
namespace DB
{
struct StorageID;
class ASTCreateQuery;
class Context;
using ContextPtr = std::shared_ptr<const Context>;
/// Helper for replicated tables that use zookeeper for coordination among replicas.
/// Handles things like:
/// * Expanding macros like {table} and {uuid} in zookeeper path. Some macros are expanded+saved once
/// on table creation (e.g. {table}, to avoid changing the path if the table is later renamed),
/// others are expanded on each server startup and each replica (e.g. {replica} because it's
/// different on different replicas).
/// * When dropping table with znode path (say) "/clickhouse/tables/{uuid}/{shard}", delete not only
/// the znode at this path but also the parent znode "/clickhouse/tables/{uuid}" if it became empty.
/// Otherwise each created+dropped table would leave behind an empty znode.
struct TableZnodeInfo
{
String path;
String replica_name;
/// Which zookeeper cluster to use ("default" or one of auxiliary zookeepers listed in config).
String zookeeper_name = "default";
/// Path with optional zookeeper_name prefix: "<auxiliary_zookeeper_name>:<path>".
String full_path;
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro.
RenamingRestrictions renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
/// Information to save in table metadata and send to replicas (if ON CLUSTER or DatabaseReplicated).
/// Has some macros expanded (e.g. {table}), others left unexpanded (e.g. {replica}).
String full_path_for_metadata;
String replica_name_for_metadata;
/// Path to an ancestor of `path` that should be considered "owned" by this table (shared among
/// replicas of the table). When table is dropped, this znode will be removed if it became empty.
/// E.g. path = "/clickhouse/tables/{uuid}/{shard}", path_prefix_to_drop = "/clickhouse/tables/{uuid}".
String path_prefix_for_drop;
static TableZnodeInfo resolve(
const String & requested_path, const String & requested_replica_name,
const StorageID & table_id, const ASTCreateQuery & query, LoadingStrictnessLevel mode,
const ContextPtr & context);
void dropAncestorZnodesIfNeeded(const zkutil::ZooKeeperPtr & zookeeper) const;
};
}

View File

@ -42,6 +42,7 @@
<multi_read>1</multi_read>
<check_not_exists>1</check_not_exists>
<create_if_not_exists>1</create_if_not_exists>
<remove_recursive>1</remove_recursive>
</feature_flags>
</keeper_server>
</clickhouse>

View File

@ -64,6 +64,7 @@ function configure()
randomize_config_boolean_value multi_read keeper_port
randomize_config_boolean_value check_not_exists keeper_port
randomize_config_boolean_value create_if_not_exists keeper_port
randomize_config_boolean_value remove_recursive keeper_port
fi
sudo chown clickhouse /etc/clickhouse-server/config.d/keeper_port.xml

View File

@ -393,6 +393,7 @@ def test_table_functions():
f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')",
f"iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
f"gcs('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
]
def make_test_case(i):

View File

@ -0,0 +1,46 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
<macros>
<shard>01</shard>
</macros>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<default>
<disk>default</disk>
<perform_ttl_move_on_insert>False</perform_ttl_move_on_insert>
</default>
<s3>
<disk>s3</disk>
<perform_ttl_move_on_insert>False</perform_ttl_move_on_insert>
</s3>
</volumes>
<move_factor>0.0</move_factor>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
<storage_policy>s3</storage_policy>
</merge_tree>
<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>

View File

@ -0,0 +1,117 @@
from pathlib import Path
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"config.xml",
],
macros={"replica": "node1"},
with_zookeeper=True,
with_minio=True,
)
DATABASE_NAME = "stale_moving_parts"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def q(node, query):
return node.query(database=DATABASE_NAME, sql=query)
# .../disks/s3/store/
def get_table_path(node, table):
return (
node.query(
sql=f"SELECT data_paths FROM system.tables WHERE table = '{table}' and database = '{DATABASE_NAME}' LIMIT 1"
)
.strip('"\n[]')
.split(",")[1]
.strip("'")
)
def exec(node, cmd, path):
return node.exec_in_container(
[
"bash",
"-c",
f"{cmd} {path}",
]
)
def wait_part_is_stuck(node, table_moving_path, moving_part):
num_tries = 5
while q(node, "SELECT part_name FROM system.moves").strip() != moving_part:
if num_tries == 0:
raise Exception("Part has not started to move")
num_tries -= 1
time.sleep(1)
num_tries = 5
while exec(node, "ls", table_moving_path).strip() != moving_part:
if num_tries == 0:
raise Exception("Part is not stuck in the moving directory")
num_tries -= 1
time.sleep(1)
def wait_zookeeper_node_to_start(zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = cluster.get_kazoo_client(instance)
conn.get_children("/")
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
def test_remove_stale_moving_parts_without_zookeeper(started_cluster):
ch1.query(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")
q(
ch1,
"CREATE TABLE test_remove ON CLUSTER cluster ( id UInt32 ) ENGINE ReplicatedMergeTree() ORDER BY id;",
)
table_moving_path = Path(get_table_path(ch1, "test_remove")) / "moving"
q(ch1, "SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active")
q(ch1, "INSERT INTO test_remove SELECT number FROM numbers(100);")
moving_part = "all_0_0_0"
move_response = ch1.get_query_request(
sql=f"ALTER TABLE test_remove MOVE PART '{moving_part}' TO DISK 's3'",
database=DATABASE_NAME,
)
wait_part_is_stuck(ch1, table_moving_path, moving_part)
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
# Stop moves in case table is not read-only yet
q(ch1, "SYSTEM STOP MOVES")
q(ch1, "SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active")
assert "Cancelled moving parts" in move_response.get_error()
assert exec(ch1, "ls", table_moving_path).strip() == ""
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo1", "zoo2", "zoo3"])
q(ch1, "SYSTEM START MOVES")
q(ch1, f"DROP TABLE test_remove")

View File

@ -560,7 +560,6 @@ positionCaseInsensitive
positionCaseInsensitiveUTF8
positionUTF8
pow
printf
proportionsZTest
protocol
queryID

View File

@ -1,2 +1,2 @@
default 127.0.0.1 9181 0 0 0 1 1 ['FILTERED_LIST','MULTI_READ','CHECK_NOT_EXISTS','CREATE_IF_NOT_EXISTS']
default 127.0.0.1 9181 0 0 0 1 1 ['FILTERED_LIST','MULTI_READ','CHECK_NOT_EXISTS','CREATE_IF_NOT_EXISTS','REMOVE_RECURSIVE']
zookeeper2 localhost 9181 0 0 0 1

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
db="rdb_$CLICKHOUSE_DATABASE"
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -nq "
create database $db engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1');
create table $db.a (x Int8) engine ReplicatedMergeTree order by x;"
uuid=`$CLICKHOUSE_CLIENT -q "select uuid from system.tables where database = '$db' and name = 'a'"`
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -nq "
select count() from system.zookeeper where path = '/clickhouse/tables' and name = '$uuid';
drop table $db.a sync;
select count() from system.zookeeper where path = '/clickhouse/tables' and name = '$uuid';"
$CLICKHOUSE_CLIENT -q "drop database $db"