Merge branch 'master' into fix-segfault-deltalake

This commit is contained in:
Kseniia Sumarokova 2024-09-06 16:57:32 +02:00 committed by GitHub
commit 0c18c38cac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 302 additions and 53 deletions

View File

@ -3,6 +3,8 @@
FROM alpine:3.18
RUN apk add --no-cache -U iproute2 \
&& for bin in iptables iptables-restore iptables-save; \
&& for bin in \
iptables iptables-restore iptables-save \
ip6tables ip6tables-restore ip6tables-save; \
do ln -sf xtables-nft-multi "/sbin/$bin"; \
done

View File

@ -24,9 +24,11 @@ DELETE FROM hits WHERE Title LIKE '%hello%';
## Lightweight `DELETE` does not delete data immediately
Lightweight `DELETE` is implemented as a [mutation](/en/sql-reference/statements/alter#mutations), which is executed asynchronously in the background by default. The statement is going to return almost immediately, but the data can still be visible to queries until the mutation is finished.
Lightweight `DELETE` is implemented as a [mutation](/en/sql-reference/statements/alter#mutations) that marks rows as deleted but does not immediately physically delete them.
The mutation marks rows as deleted, and at that point, they will no longer show up in query results. It does not physically delete the data, this will happen during the next merge. As a result, it is possible that for an unspecified period, data is not actually deleted from storage and is only marked as deleted.
By default, `DELETE` statements wait until marking the rows as deleted is completed before returning. This can take a long time if the amount of data is large. Alternatively, you can run it asynchronously in the background using the setting [`lightweight_deletes_sync`](/en/operations/settings/settings#lightweight_deletes_sync). If disabled, the `DELETE` statement is going to return immediately, but the data can still be visible to queries until the background mutation is finished.
The mutation does not physically delete the rows that have been marked as deleted, this will only happen during the next merge. As a result, it is possible that for an unspecified period, data is not actually deleted from storage and is only marked as deleted.
If you need to guarantee that your data is deleted from storage in a predictable time, consider using the table setting [`min_age_to_force_merge_seconds`](https://clickhouse.com/docs/en/operations/settings/merge-tree-settings#min_age_to_force_merge_seconds). Or you can use the [ALTER TABLE ... DELETE](/en/sql-reference/statements/alter/delete) command. Note that deleting data using `ALTER TABLE ... DELETE` may consume significant resources as it recreates all affected parts.

View File

@ -1896,6 +1896,21 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
/// Temporarily apply query settings to context.
std::optional<Settings> old_settings;
SCOPE_EXIT_SAFE({
try
{
/// We need to park ParallelFormating threads,
/// because they can use settings from global context
/// and it can lead to data race with `setSettings`
resetOutput();
}
catch (...)
{
if (!have_error)
{
client_exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(print_stack_trace), getCurrentExceptionCode());
have_error = true;
}
}
if (old_settings)
client_context->setSettings(*old_settings);
});

View File

@ -42,7 +42,7 @@ public:
size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
using Entry = IConnectionPool::Entry;
using PoolWithFailoverBase<IConnectionPool>::isTryResultInvalid;
using PoolWithFailoverBase<IConnectionPool>::checkTryResultIsValid;
/** Allocates connection to work. */
Entry get(const ConnectionTimeouts & timeouts) override;

View File

@ -122,6 +122,14 @@ public:
return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly);
}
void checkTryResultIsValid(const TryResult & result, bool skip_read_only_replicas) const
{
if (isTryResultInvalid(result, skip_read_only_replicas))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"Got an invalid connection result: entry.isNull {}, is_usable {}, is_up_to_date {}, delay {}, is_readonly {}, skip_read_only_replicas {}",
result.entry.isNull(), result.is_usable, result.is_up_to_date, result.delay, result.is_readonly, skip_read_only_replicas);
}
size_t getPoolSize() const { return nested_pools.size(); }
protected:

View File

@ -781,14 +781,14 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
const auto & settings = getContext()->getSettingsRef();
if (index_desc.type == FULL_TEXT_INDEX_NAME && !settings.allow_experimental_full_text_index)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental full-text index feature is not enabled (the setting 'allow_experimental_full_text_index')");
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental full-text index feature is disabled. Turn on setting 'allow_experimental_full_text_index'");
/// ----
/// Temporary check during a transition period. Please remove at the end of 2024.
if (index_desc.type == INVERTED_INDEX_NAME && !settings.allow_experimental_inverted_index)
throw Exception(ErrorCodes::ILLEGAL_INDEX, "Please use index type 'full_text' instead of 'inverted'");
/// ----
if (index_desc.type == "vector_similarity" && !settings.allow_experimental_vector_similarity_index)
throw Exception(ErrorCodes::INCORRECT_QUERY, "Vector similarity index is disabled. Turn on allow_experimental_vector_similarity_index");
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental vector similarity index is disabled. Turn on setting 'allow_experimental_vector_similarity_index'");
properties.indices.push_back(index_desc);
}

View File

@ -1142,6 +1142,16 @@ bool AlterCommands::hasFullTextIndex(const StorageInMemoryMetadata & metadata)
return false;
}
bool AlterCommands::hasVectorSimilarityIndex(const StorageInMemoryMetadata & metadata)
{
for (const auto & index : metadata.secondary_indices)
{
if (index.type == "vector_similarity")
return true;
}
return false;
}
void AlterCommands::apply(StorageInMemoryMetadata & metadata, ContextPtr context) const
{
if (!prepared)

View File

@ -237,6 +237,9 @@ public:
/// Check if commands have any full-text index
static bool hasFullTextIndex(const StorageInMemoryMetadata & metadata);
/// Check if commands have any vector similarity index
static bool hasVectorSimilarityIndex(const StorageInMemoryMetadata & metadata);
};
}

View File

@ -28,7 +28,6 @@ namespace ErrorCodes
extern const int TOO_MANY_PARTITIONS;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
/// Can the batch be split and send files from batch one-by-one instead?
@ -244,9 +243,7 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto result = results.front();
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
parent.pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas);
connection = std::move(result.entry);
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
@ -306,9 +303,7 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto result = results.front();
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
parent.pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas);
auto connection = std::move(result.entry);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;

View File

@ -416,9 +416,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path,
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto result = results.front();
if (pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas);
auto connection = std::move(result.entry);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",

View File

@ -347,7 +347,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
}
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
const Settings & settings = context->getSettingsRef();
const Settings settings = context->getSettingsCopy();
size_t rows = shard_block.rows();
@ -378,9 +378,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto result = results.front();
if (shard_info.pool->isTryResultInvalid(result, settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
shard_info.pool->checkTryResultIsValid(result, settings.distributed_insert_skip_read_only_replicas);
job.connection_entry = std::move(result.entry);
}
else

View File

@ -3230,6 +3230,10 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Experimental full-text index feature is not enabled (turn on setting 'allow_experimental_full_text_index')");
if (AlterCommands::hasVectorSimilarityIndex(new_metadata) && !settings.allow_experimental_vector_similarity_index)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Experimental vector similarity index is disabled (turn on setting 'allow_experimental_vector_similarity_index')");
for (const auto & disk : getDisks())
if (!disk->supportsHardLinks() && !commands.isSettingsAlter() && !commands.isCommentAlter())
throw Exception(

View File

@ -195,7 +195,7 @@ void MergeTreeIndexGranuleVectorSimilarity::serializeBinary(WriteBuffer & ostr)
LOG_TRACE(logger, "Start writing vector similarity index");
if (empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to write empty minmax index {}", backQuote(index_name));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to write empty vector similarity index {}", backQuote(index_name));
writeIntBinary(FILE_FORMAT_VERSION, ostr);

View File

@ -40,10 +40,12 @@ void extractReferenceVectorFromLiteral(std::vector<Float64> & reference_vector,
}
}
VectorSimilarityCondition::Info::DistanceFunction stringToDistanceFunction(std::string_view distance_function)
VectorSimilarityCondition::Info::DistanceFunction stringToDistanceFunction(const String & distance_function)
{
if (distance_function == "L2Distance")
return VectorSimilarityCondition::Info::DistanceFunction::L2;
else if (distance_function == "cosineDistance")
return VectorSimilarityCondition::Info::DistanceFunction::Cosine;
else
return VectorSimilarityCondition::Info::DistanceFunction::Unknown;
}
@ -57,7 +59,7 @@ VectorSimilarityCondition::VectorSimilarityCondition(const SelectQueryInfo & que
, index_is_useful(checkQueryStructure(query_info))
{}
bool VectorSimilarityCondition::alwaysUnknownOrTrue(String distance_function) const
bool VectorSimilarityCondition::alwaysUnknownOrTrue(const String & distance_function) const
{
if (!index_is_useful)
return true; /// query isn't supported

View File

@ -57,7 +57,8 @@ public:
enum class DistanceFunction : uint8_t
{
Unknown,
L2
L2,
Cosine
};
std::vector<Float64> reference_vector;
@ -68,7 +69,7 @@ public:
};
/// Returns false if query can be speeded up by an ANN index, true otherwise.
bool alwaysUnknownOrTrue(String distance_function) const;
bool alwaysUnknownOrTrue(const String & distance_function) const;
std::vector<Float64> getReferenceVector() const;
size_t getDimensions() const;
@ -141,18 +142,12 @@ private:
/// Traverses the AST of ORDERBY section
void traverseOrderByAST(const ASTPtr & node, RPN & rpn);
/// Returns true and stores ANNExpr if the query has valid WHERE section
static bool matchRPNWhere(RPN & rpn, Info & info);
/// Returns true and stores ANNExpr if the query has valid ORDERBY section
static bool matchRPNOrderBy(RPN & rpn, Info & info);
/// Returns true and stores Length if we have valid LIMIT clause in query
static bool matchRPNLimit(RPNElement & rpn, UInt64 & limit);
/// Matches dist function, reference vector, column name
static bool matchMainParts(RPN::iterator & iter, const RPN::iterator & end, Info & info);
/// Gets float or int from AST node
static float getFloatOrIntLiteralOrPanic(const RPN::iterator& iter);

View File

@ -2112,6 +2112,7 @@ class ClickHouseCluster:
self.base_cmd + ["up", "--force-recreate", "--no-deps", "-d", node.name]
)
node.ip_address = self.get_instance_ip(node.name)
node.ipv6_address = self.get_instance_global_ipv6(node.name)
node.client = Client(node.ip_address, command=self.client_bin_path)
logging.info("Restart node with ip change")
@ -3182,6 +3183,7 @@ class ClickHouseCluster:
for instance in self.instances.values():
instance.docker_client = self.docker_client
instance.ip_address = self.get_instance_ip(instance.name)
instance.ipv6_address = self.get_instance_global_ipv6(instance.name)
logging.debug(
f"Waiting for ClickHouse start in {instance.name}, ip: {instance.ip_address}..."

View File

@ -3,6 +3,7 @@ import subprocess
import time
import logging
import docker
import ipaddress
class PartitionManager:
@ -26,23 +27,74 @@ class PartitionManager:
self._check_instance(instance)
self._add_rule(
{"source": instance.ip_address, "destination_port": 2181, "action": action}
{
"source": instance.ip_address,
"destination_port": 2181,
"action": action,
}
)
self._add_rule(
{"destination": instance.ip_address, "source_port": 2181, "action": action}
{
"destination": instance.ip_address,
"source_port": 2181,
"action": action,
}
)
if instance.ipv6_address:
self._add_rule(
{
"source": instance.ipv6_address,
"destination_port": 2181,
"action": action,
}
)
self._add_rule(
{
"destination": instance.ipv6_address,
"source_port": 2181,
"action": action,
}
)
def dump_rules(self):
return _NetworkManager.get().dump_rules()
v4 = _NetworkManager.get().dump_rules()
v6 = _NetworkManager.get().dump_v6_rules()
return v4 + v6
def restore_instance_zk_connections(self, instance, action="DROP"):
self._check_instance(instance)
self._delete_rule(
{"source": instance.ip_address, "destination_port": 2181, "action": action}
{
"source": instance.ip_address,
"destination_port": 2181,
"action": action,
}
)
self._delete_rule(
{"destination": instance.ip_address, "source_port": 2181, "action": action}
{
"destination": instance.ip_address,
"source_port": 2181,
"action": action,
}
)
if instance.ipv6_address:
self._delete_rule(
{
"source": instance.ipv6_address,
"destination_port": 2181,
"action": action,
}
)
self._delete_rule(
{
"destination": instance.ipv6_address,
"source_port": 2181,
"action": action,
}
)
def partition_instances(self, left, right, port=None, action="DROP"):
@ -59,15 +111,33 @@ class PartitionManager:
rule["destination_port"] = port
return rule
def create_rule_v6(src, dst):
rule = {
"source": src.ipv6_address,
"destination": dst.ipv6_address,
"action": action,
}
if port is not None:
rule["destination_port"] = port
return rule
self._add_rule(create_rule(left, right))
self._add_rule(create_rule(right, left))
if left.ipv6_address and right.ipv6_address:
self._add_rule(create_rule_v6(left, right))
self._add_rule(create_rule_v6(right, left))
def add_network_delay(self, instance, delay_ms):
self._add_tc_netem_delay(instance, delay_ms)
def heal_all(self):
while self._iptables_rules:
rule = self._iptables_rules.pop()
if self._is_ipv6_rule(rule):
_NetworkManager.get().delete_ip6tables_rule(**rule)
else:
_NetworkManager.get().delete_iptables_rule(**rule)
while self._netem_delayed_instances:
@ -90,11 +160,26 @@ class PartitionManager:
if instance.ip_address is None:
raise Exception("Instance + " + instance.name + " is not launched!")
@staticmethod
def _is_ipv6_rule(rule):
if rule.get("source"):
return ipaddress.ip_address(rule["source"]).version == 6
if rule.get("destination"):
return ipaddress.ip_address(rule["destination"]).version == 6
return False
def _add_rule(self, rule):
if self._is_ipv6_rule(rule):
_NetworkManager.get().add_ip6tables_rule(**rule)
else:
_NetworkManager.get().add_iptables_rule(**rule)
self._iptables_rules.append(rule)
def _delete_rule(self, rule):
if self._is_ipv6_rule(rule):
_NetworkManager.get().delete_ip6tables_rule(**rule)
else:
_NetworkManager.get().delete_iptables_rule(**rule)
self._iptables_rules.remove(rule)
@ -150,35 +235,65 @@ class _NetworkManager:
cls._instance = cls(**kwargs)
return cls._instance
def setup_ip6tables_docker_user_chain(self):
_rules = subprocess.check_output(f"ip6tables-save", shell=True)
if "DOCKER-USER" in _rules.decode("utf-8"):
return
setup_cmds = [
["ip6tables", "--wait", "-N", "DOCKER-USER"],
["ip6tables", "--wait", "-I", "FORWARD", "-j", "DOCKER-USER"],
["ip6tables", "--wait", "-A", "DOCKER-USER", "-j", "RETURN"],
]
for cmd in setup_cmds:
self._exec_run(cmd, privileged=True)
def add_iptables_rule(self, **kwargs):
cmd = ["iptables", "--wait", "-I", "DOCKER-USER", "1"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def add_ip6tables_rule(self, **kwargs):
self.setup_ip6tables_docker_user_chain()
cmd = ["ip6tables", "--wait", "-I", "DOCKER-USER", "1"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def delete_iptables_rule(self, **kwargs):
cmd = ["iptables", "--wait", "-D", "DOCKER-USER"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def delete_ip6tables_rule(self, **kwargs):
cmd = ["ip6tables", "--wait", "-D", "DOCKER-USER"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def dump_rules(self):
cmd = ["iptables", "-L", "DOCKER-USER"]
return self._exec_run(cmd, privileged=True)
def dump_v6_rules(self):
cmd = ["ip6tables", "-L", "DOCKER-USER"]
return self._exec_run(cmd, privileged=True)
@staticmethod
def clean_all_user_iptables_rules():
for iptables in ("iptables", "ip6tables"):
for i in range(1000):
iptables_iter = i
# when rules will be empty, it will return error
res = subprocess.run("iptables --wait -D DOCKER-USER 1", shell=True)
res = subprocess.run(f"{iptables} --wait -D DOCKER-USER 1", shell=True)
if res.returncode != 0:
logging.info(
"All iptables rules cleared, "
f"All {iptables} rules cleared, "
+ str(iptables_iter)
+ " iterations, last error: "
+ str(res.stderr)
)
return
break
@staticmethod
def _iptables_cmd_suffix(

View File

@ -5,6 +5,7 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_snapshot>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>

View File

@ -5,6 +5,7 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_snapshot>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>

View File

@ -5,6 +5,7 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_snapshot>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>

View File

@ -2,6 +2,9 @@ import pytest
from helpers.cluster import ClickHouseCluster
from time import sleep
from retry import retry
from multiprocessing.dummy import Pool
import helpers.keeper_utils as keeper_utils
from minio.deleteobjects import DeleteObject
from kazoo.client import KazooClient
@ -75,7 +78,18 @@ def wait_node(node):
raise Exception("Can't wait node", node.name, "to become ready")
def delete_keeper_snapshots_logs(nodex):
nodex.exec_in_container(
[
"bash",
"-c",
"rm -rf /var/lib/clickhouse/coordination/log /var/lib/clickhouse/coordination/snapshots",
]
)
def test_s3_upload(started_cluster):
node1_zk = get_fake_zk(node1.name)
# we defined in configs snapshot_distance as 50
@ -89,6 +103,11 @@ def test_s3_upload(started_cluster):
for obj in list(cluster.minio_client.list_objects("snapshots"))
]
def delete_s3_snapshots():
snapshots = cluster.minio_client.list_objects("snapshots")
for s in snapshots:
cluster.minio_client.remove_object("snapshots", s.object_name)
# Keeper sends snapshots asynchornously, hence we need to retry.
@retry(AssertionError, tries=10, delay=2)
def _check_snapshots():
@ -125,3 +144,26 @@ def test_s3_upload(started_cluster):
)
destroy_zk_client(node2_zk)
node2.stop_clickhouse()
delete_keeper_snapshots_logs(node2)
node3.stop_clickhouse()
delete_keeper_snapshots_logs(node3)
delete_keeper_snapshots_logs(node1)
p = Pool(3)
waiters = []
def start_clickhouse(node):
node.start_clickhouse()
waiters.append(p.apply_async(start_clickhouse, args=(node1,)))
waiters.append(p.apply_async(start_clickhouse, args=(node2,)))
waiters.append(p.apply_async(start_clickhouse, args=(node3,)))
delete_s3_snapshots() # for next iteration
for waiter in waiters:
waiter.wait()
keeper_utils.wait_until_connected(cluster, node1)
keeper_utils.wait_until_connected(cluster, node2)
keeper_utils.wait_until_connected(cluster, node3)

View File

@ -629,5 +629,6 @@ def test_roles_cache():
check()
instance.query("DROP USER " + ", ".join(users))
if roles:
instance.query("DROP ROLE " + ", ".join(roles))
instance.query("DROP TABLE tbl")

View File

@ -1,4 +1,4 @@
-- Tags: no-fasttest
-- Tags: no-fasttest, no-debug, no-tsan, no-msan, no-asan
SET min_execution_speed = 100000000000, timeout_before_checking_execution_speed = 0;
SELECT count() FROM system.numbers; -- { serverError TOO_SLOW }

View File

@ -0,0 +1,32 @@
-- Tags: no-fasttest, no-ordinary-database
-- Tests that CREATE TABLE and ADD INDEX respect setting 'allow_experimental_vector_similarity_index'.
DROP TABLE IF EXISTS tab;
-- Test CREATE TABLE
SET allow_experimental_vector_similarity_index = 0;
CREATE TABLE tab (id UInt32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance')) ENGINE = MergeTree ORDER BY tuple(); -- { serverError SUPPORT_IS_DISABLED }
SET allow_experimental_vector_similarity_index = 1;
CREATE TABLE tab (id UInt32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance')) ENGINE = MergeTree ORDER BY tuple();
DROP TABLE tab;
-- Test ADD INDEX
CREATE TABLE tab (id UInt32, vec Array(Float32)) ENGINE = MergeTree ORDER BY tuple();
SET allow_experimental_vector_similarity_index = 0;
ALTER TABLE tab ADD INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance'); -- { serverError SUPPORT_IS_DISABLED }
SET allow_experimental_vector_similarity_index = 1;
ALTER TABLE tab ADD INDEX idx vec TYPE vector_similarity('hnsw', 'L2Distance');
-- Other index DDL must work regardless of the setting
SET allow_experimental_vector_similarity_index = 0;
ALTER TABLE tab MATERIALIZE INDEX idx;
-- ALTER TABLE tab CLEAR INDEX idx; -- <-- Should work but doesn't w/o enabled setting. Unexpected but not terrible.
ALTER TABLE tab DROP INDEX idx;
DROP TABLE tab;

View File

@ -41,6 +41,21 @@ Special cases
6 [1,9.3] 0.005731362878640178
1 [2,3.2] 0.15200169244542905
7 [5.5,4.7] 0.3503476876550442
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
ReadFromMergeTree (default.tab)
Indexes:
PrimaryKey
Condition: true
Parts: 1/1
Granules: 4/4
Skip
Name: idx
Description: vector_similarity GRANULARITY 2
Parts: 1/1
Granules: 2/4
-- Setting "max_limit_for_ann_queries"
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))

View File

@ -63,6 +63,13 @@ FROM tab
ORDER BY cosineDistance(vec, reference_vec)
LIMIT 3;
EXPLAIN indexes = 1
WITH [0.0, 2.0] AS reference_vec
SELECT id, vec, cosineDistance(vec, reference_vec)
FROM tab
ORDER BY cosineDistance(vec, reference_vec)
LIMIT 3;
SELECT '-- Setting "max_limit_for_ann_queries"';
EXPLAIN indexes=1
WITH [0.0, 2.0] as reference_vec