Merge branch 'master' into replicas_status_api_optimize

This commit is contained in:
mateng915 2023-01-20 10:19:40 +08:00 committed by GitHub
commit 5dc81fac2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 754 additions and 154 deletions

View File

@ -147,6 +147,14 @@ hash cmake
ClickHouse is available in pre-built binaries and packages. Binaries are portable and can be run on any Linux flavour.
Binaries are built for stable and LTS releases and also every commit to `master` for each pull request.
The CI checks build the binaries on each commit to [ClickHouse](https://github.com/clickhouse/clickhouse/). To download them:
1. Open the [commits list](https://github.com/ClickHouse/ClickHouse/commits/master)
1. Choose a **Merge pull request** commit that includes the new feature, or was added after the new feature
1. Click the status symbol (yellow dot, red x, green check) to open the CI check list
1. Scroll through the list until you find **ClickHouse build check x/x artifact groups are OK**
1. Click **Details**
1. Find the type of package for your operating system that you need and download the files.
![build artifact check](images/find-build-artifact.png)
To find the freshest build from `master`, go to [commits page](https://github.com/ClickHouse/ClickHouse/commits/master), click on the first green check mark or red cross near commit, and click to the “Details” link right after “ClickHouse Build Check”.

Binary file not shown.

After

Width:  |  Height:  |  Size: 122 KiB

View File

@ -390,40 +390,46 @@ SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
### Available Types of Indices {#available-types-of-indices}
#### `minmax`
#### MinMax
Stores extremes of the specified expression (if the expression is `tuple`, then it stores extremes for each element of `tuple`), uses stored info for skipping blocks of data like the primary key.
#### `set(max_rows)`
Syntax: `minmax`
#### Set
Stores unique values of the specified expression (no more than `max_rows` rows, `max_rows=0` means “no limits”). Uses the values to check if the `WHERE` expression is not satisfiable on a block of data.
#### `ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)`
Syntax: `set(max_rows)`
Stores a [Bloom filter](https://en.wikipedia.org/wiki/Bloom_filter) that contains all ngrams from a block of data. Works only with datatypes: [String](/docs/en/sql-reference/data-types/string.md), [FixedString](/docs/en/sql-reference/data-types/fixedstring.md) and [Map](/docs/en/sql-reference/data-types/map.md). Can be used for optimization of `EQUALS`, `LIKE` and `IN` expressions.
#### Bloom Filter
Stores a [Bloom filter](https://en.wikipedia.org/wiki/Bloom_filter) for the specified columns. An optional `false_positive` parameter with possible values between 0 and 1 specifies the probability of receiving a false positive response from the filter. Default value: 0.025. Supported data types: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`, `Array`, `LowCardinality`, `Nullable`, `UUID` and `Map`. For the `Map` data type, the client can specify if the index should be created for keys or values using [mapKeys](/docs/en/sql-reference/functions/tuple-map-functions.md/#mapkeys) or [mapValues](/docs/en/sql-reference/functions/tuple-map-functions.md/#mapvalues) function.
Syntax: `bloom_filter([false_positive])`
#### N-gram Bloom Filter
Stores a [Bloom filter](https://en.wikipedia.org/wiki/Bloom_filter) that contains all n-grams from a block of data. Only works with datatypes: [String](/docs/en/sql-reference/data-types/string.md), [FixedString](/docs/en/sql-reference/data-types/fixedstring.md) and [Map](/docs/en/sql-reference/data-types/map.md). Can be used for optimization of `EQUALS`, `LIKE` and `IN` expressions.
Syntax: `ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)`
- `n` — ngram size,
- `size_of_bloom_filter_in_bytes` — Bloom filter size in bytes (you can use large values here, for example, 256 or 512, because it can be compressed well).
- `number_of_hash_functions` — The number of hash functions used in the Bloom filter.
- `random_seed` — The seed for Bloom filter hash functions.
#### `tokenbf_v1(size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)`
#### Token Bloom Filter
The same as `ngrambf_v1`, but stores tokens instead of ngrams. Tokens are sequences separated by non-alphanumeric characters.
#### `bloom_filter([false_positive])` — Stores a [Bloom filter](https://en.wikipedia.org/wiki/Bloom_filter) for the specified columns.
Syntax: `tokenbf_v1(size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)`
The optional `false_positive` parameter is the probability of receiving a false positive response from the filter. Possible values: (0, 1). Default value: 0.025.
#### Special-purpose
Supported data types: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`, `Array`, `LowCardinality`, `Nullable`, `UUID`, `Map`.
- An experimental index to support approximate nearest neighbor (ANN) search. See [here](annindexes.md) for details.
For `Map` data type client can specify if index should be created for keys or values using [mapKeys](/docs/en/sql-reference/functions/tuple-map-functions.md/#mapkeys) or [mapValues](/docs/en/sql-reference/functions/tuple-map-functions.md/#mapvalues) function.
There are also special-purpose and experimental indexes to support approximate nearest neighbor (ANN) queries. See [here](annindexes.md) for details.
The following functions can use the filter: [equals](/docs/en/sql-reference/functions/comparison-functions.md), [notEquals](/docs/en/sql-reference/functions/comparison-functions.md), [in](/docs/en/sql-reference/functions/in-functions), [notIn](/docs/en/sql-reference/functions/in-functions), [has](/docs/en/sql-reference/functions/array-functions#hasarr-elem), [hasAny](/docs/en/sql-reference/functions/array-functions#hasany), [hasAll](/docs/en/sql-reference/functions/array-functions#hasall).
Example of index creation for `Map` data type
## Example of index creation for Map data type
```
INDEX map_key_index mapKeys(map_column) TYPE bloom_filter GRANULARITY 1
@ -484,9 +490,6 @@ For example:
:::
## Approximate Nearest Neighbor Search Indexes [experimental] {#table_engines-ANNIndex}
In addition to skip indices, there are also [Approximate Nearest Neighbor Search Indexes](/docs/en/engines/table-engines/mergetree-family/annindexes.md).
## Projections {#projections}
Projections are like [materialized views](/docs/en/sql-reference/statements/create/view.md/#materialized) but defined in part-level. It provides consistency guarantees along with automatic usage in queries.
@ -885,6 +888,10 @@ User can assign new big parts to different disks of a [JBOD](https://en.wikipedi
## Using S3 for Data Storage {#table_engine-mergetree-s3}
:::note
Google Cloud Storage (GCS) is also supported using the type `s3`. See [GCS backed MergeTree](/docs/en/integrations/data-ingestion/s3/gcs-merge-tree.md).
:::
`MergeTree` family table engines can store data to [S3](https://aws.amazon.com/s3/) using a disk with type `s3`.
Configuration markup:
@ -894,6 +901,7 @@ Configuration markup:
<disks>
<s3>
<type>s3</type>
<support_batch_delete>true</support_batch_delete>
<endpoint>https://clickhouse-public-datasets.s3.amazonaws.com/my-bucket/root-path/</endpoint>
<access_key_id>your_access_key_id</access_key_id>
<secret_access_key>your_secret_access_key</secret_access_key>
@ -927,6 +935,7 @@ Required parameters:
Optional parameters:
- `region` — S3 region name.
- `support_batch_delete` — This controls the check to see if batch deletes are supported. Set this to `false` when using Google Cloud Storage (GCS) as GCS does not support batch deletes and preventing the checks will prevent error messages in the logs.
- `use_environment_credentials` — Reads AWS credentials from the Environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN if they exist. Default value is `false`.
- `use_insecure_imds_request` — If set to `true`, S3 client will use insecure IMDS request while obtaining credentials from Amazon EC2 metadata. Default value is `false`.
- `proxy` — Proxy configuration for S3 endpoint. Each `uri` element inside `proxy` block should contain a proxy URL.

View File

@ -271,6 +271,9 @@ Youll need to create data and metadata folders manually and `chown` them for
On Gentoo, you can just use `emerge clickhouse` to install ClickHouse from sources.
### From CI checks pre-built binaries
ClickHouse binaries are built for each [commit](/docs/en/development/build.md#you-dont-have-to-build-clickhouse).
## Launch {#launch}
To start the server as a daemon, run:

View File

@ -145,6 +145,11 @@ public:
*/
void resolveAsFunction(FunctionBasePtr function_value);
void resolveAsFunction(const FunctionOverloadResolverPtr & resolver)
{
resolveAsFunction(resolver->build(getArgumentColumns()));
}
/** Resolve function node as aggregate function.
* It is important that function name is updated with resolved function name.
* Main motivation for this is query tree optimizations.

View File

@ -1,8 +1,13 @@
#pragma once
#include <optional>
#include <utility>
#include <Common/SettingsChanges.h>
#include <Common/Exception.h>
#include <Core/Settings.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/QueryNode.h>
namespace DB

View File

@ -16,6 +16,8 @@ using ListNodePtr = std::shared_ptr<ListNode>;
class ListNode final : public IQueryTreeNode
{
public:
using iterator = QueryTreeNodes::iterator;
/// Initialize list node with empty nodes
ListNode();
@ -41,6 +43,9 @@ public:
void dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const override;
iterator begin() { return children.begin(); }
iterator end() { return children.end(); }
protected:
bool isEqualImpl(const IQueryTreeNode & rhs) const override;

View File

@ -0,0 +1,134 @@
#include <memory>
#include <unordered_map>
#include <vector>
#include <Analyzer/Passes/ConvertOrLikeChainPass.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/UnionNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/HashUtils.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Core/Field.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/likePatternToRegexp.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace
{
class ConvertOrLikeChainVisitor : public InDepthQueryTreeVisitor<ConvertOrLikeChainVisitor>
{
using FunctionNodes = std::vector<std::shared_ptr<FunctionNode>>;
const FunctionOverloadResolverPtr match_function_ref;
const FunctionOverloadResolverPtr or_function_resolver;
public:
explicit ConvertOrLikeChainVisitor(ContextPtr context)
: InDepthQueryTreeVisitor<ConvertOrLikeChainVisitor>()
, match_function_ref(FunctionFactory::instance().get("multiMatchAny", context))
, or_function_resolver(FunctionFactory::instance().get("or", context))
{}
static bool needChildVisit(VisitQueryTreeNodeType & parent, VisitQueryTreeNodeType &)
{
ContextPtr context;
if (auto * query = parent->as<QueryNode>())
context = query->getContext();
else if (auto * union_node = parent->as<UnionNode>())
context = union_node->getContext();
if (context)
{
const auto & settings = context->getSettingsRef();
return settings.optimize_or_like_chain
&& settings.allow_hyperscan
&& settings.max_hyperscan_regexp_length == 0
&& settings.max_hyperscan_regexp_total_length == 0;
}
return true;
}
void visitImpl(QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node || function_node->getFunctionName() != "or")
return;
QueryTreeNodes unique_elems;
QueryTreeNodePtrWithHashMap<Array> node_to_patterns;
FunctionNodes match_functions;
for (auto & arg : function_node->getArguments())
{
unique_elems.push_back(arg);
auto * arg_func = arg->as<FunctionNode>();
if (!arg_func)
continue;
const bool is_like = arg_func->getFunctionName() == "like";
const bool is_ilike = arg_func->getFunctionName() == "ilike";
/// Not {i}like -> bail out.
if (!is_like && !is_ilike)
continue;
const auto & like_arguments = arg_func->getArguments().getNodes();
if (like_arguments.size() != 2)
continue;
auto identifier = like_arguments[0];
auto * pattern = like_arguments[1]->as<ConstantNode>();
if (!pattern || !isString(pattern->getResultType()))
continue;
auto regexp = likePatternToRegexp(pattern->getValue().get<String>());
/// Case insensitive. Works with UTF-8 as well.
if (is_ilike)
regexp = "(?i)" + regexp;
unique_elems.pop_back();
auto it = node_to_patterns.find(identifier);
if (it == node_to_patterns.end())
{
it = node_to_patterns.insert({identifier, Array{}}).first;
/// The second argument will be added when all patterns are known.
auto match_function = std::make_shared<FunctionNode>("multiMatchAny");
match_function->getArguments().getNodes().push_back(identifier);
match_functions.push_back(match_function);
unique_elems.push_back(std::move(match_function));
}
it->second.push_back(regexp);
}
/// Add all the patterns into the function arguments lists.
for (auto & match_function : match_functions)
{
auto & arguments = match_function->getArguments().getNodes();
auto & patterns = node_to_patterns.at(arguments[0]);
arguments.push_back(std::make_shared<ConstantNode>(Field{std::move(patterns)}));
match_function->resolveAsFunction(match_function_ref);
}
/// OR must have at least two arguments.
if (unique_elems.size() == 1)
unique_elems.push_back(std::make_shared<ConstantNode>(false));
function_node->getArguments().getNodes() = std::move(unique_elems);
function_node->resolveAsFunction(or_function_resolver);
}
};
}
void ConvertOrLikeChainPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
ConvertOrLikeChainVisitor visitor(context);
visitor.visit(query_tree_node);
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <Analyzer/IQueryTreePass.h>
namespace DB
{
/** Replaces all the "or"'s with {i}like to multiMatchAny
*/
class ConvertOrLikeChainPass final : public IQueryTreePass
{
public:
String getName() override { return "ConvertOrLikeChain"; }
String getDescription() override { return "Replaces all the 'or's with {i}like to multiMatchAny"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
};
}

View File

@ -17,6 +17,7 @@
#include <Parsers/ASTSetQuery.h>
#include <Analyzer/Utils.h>
#include <fmt/core.h>
namespace DB
{
@ -179,6 +180,16 @@ void QueryNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, s
buffer << '\n' << std::string(indent + 2, ' ') << "OFFSET\n";
getOffset()->dumpTreeImpl(buffer, format_state, indent + 4);
}
if (hasSettingsChanges())
{
buffer << '\n' << std::string(indent + 2, ' ') << "SETTINGS";
for (const auto & change : settings_changes)
{
buffer << fmt::format(" {}={}", change.name, toString(change.value));
}
buffer << '\n';
}
}
bool QueryNode::isEqualImpl(const IQueryTreeNode & rhs) const

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Analyzer/QueryTreePassManager.h>
#include <Common/Exception.h>
@ -29,6 +30,7 @@
#include <Analyzer/Passes/FuseFunctionsPass.h>
#include <Analyzer/Passes/OptimizeGroupByFunctionKeysPass.h>
#include <Analyzer/Passes/IfTransformStringsToEnumPass.h>
#include <Analyzer/Passes/ConvertOrLikeChainPass.h>
#include <Analyzer/Passes/OptimizeRedundantFunctionsInOrderByPass.h>
namespace DB
@ -254,6 +256,8 @@ void addQueryTreePasses(QueryTreePassManager & manager)
if (settings.optimize_if_transform_strings_to_enum)
manager.addPass(std::make_unique<IfTransformStringsToEnumPass>());
manager.addPass(std::make_unique<ConvertOrLikeChainPass>());
}
}

View File

@ -100,6 +100,7 @@
M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \
M(FilesystemCacheSize, "Filesystem cache size in bytes") \
M(FilesystemCacheElements, "Filesystem cache elements (file segments)") \
M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \
M(S3Requests, "S3 requests") \
M(KeeperAliveConnections, "Number of alive connections") \
M(KeeperOutstandingRequets, "Number of outstanding requests") \

View File

@ -10,6 +10,7 @@
M(InsertQuery, "Same as Query, but only for INSERT queries.") \
M(AsyncInsertQuery, "Same as InsertQuery, but only for asynchronous INSERT queries.") \
M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.") \
M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.") \
M(FailedQuery, "Number of failed queries.") \
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.") \
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.") \

View File

@ -243,13 +243,23 @@ void RegExpTreeDictionary::loadData()
initRegexNodes(block);
}
initGraph();
if (regexps.empty())
throw Exception(ErrorCodes::INCORRECT_DICTIONARY_DEFINITION, "There are no available regular expression. Please check your config");
#if USE_VECTORSCAN
std::vector<std::string_view> regexps_views(regexps.begin(), regexps.end());
hyperscan_regex = MultiRegexps::getOrSet<true, false>(regexps_views, std::nullopt);
/// TODO: fallback when exceptions occur.
hyperscan_regex->get();
try
{
std::vector<std::string_view> regexps_views(regexps.begin(), regexps.end());
hyperscan_regex = MultiRegexps::getOrSet<true, false>(regexps_views, std::nullopt);
hyperscan_regex->get();
}
catch (Exception & e)
{
/// Some compile errors will be thrown as LOGICAL ERROR and cause crash, e.g. empty expression or expressions are too large.
/// We catch the error here and rethrow again.
/// TODO: fallback to other engine, like re2, when exceptions occur.
throw Exception(ErrorCodes::INCORRECT_DICTIONARY_DEFINITION, "Error occurs when compiling regular expressions, reason: {}", e.message());
}
#endif
}
else
{

View File

@ -289,6 +289,12 @@ std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
if (read_hint && *read_hint > 0)
read_hint = *read_hint + FileEncryption::Header::kSize;
if (file_size && *file_size > 0)
file_size = *file_size + FileEncryption::Header::kSize;
auto wrapped_path = wrappedPath(path);
auto buffer = delegate->readFile(wrapped_path, settings, read_hint, file_size);
if (buffer->eof())

View File

@ -55,9 +55,9 @@ protected:
return temp_dir->path() + "/";
}
String getFileContents(const String & file_name)
String getFileContents(const String & file_name, std::optional<size_t> file_size = {})
{
auto buf = encrypted_disk->readFile(file_name, /* settings= */ {}, /* read_hint= */ {}, /* file_size= */ {});
auto buf = encrypted_disk->readFile(file_name, /* settings= */ {}, /* read_hint= */ {}, file_size);
String str;
readStringUntilEOF(str, *buf);
return str;
@ -108,6 +108,10 @@ TEST_F(DiskEncryptedTest, WriteAndRead)
EXPECT_EQ(getFileContents("a.txt"), "Some text");
checkBinaryRepresentation(getDirectory() + "a.txt", kHeaderSize + 9);
/// Read the file with specified file size.
EXPECT_EQ(getFileContents("a.txt", 9), "Some text");
checkBinaryRepresentation(getDirectory() + "a.txt", kHeaderSize + 9);
/// Remove the file.
encrypted_disk->removeFile("a.txt");

View File

@ -53,10 +53,10 @@ BSONEachRowRowInputFormat::BSONEachRowRowInputFormat(
ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, std::move(params_))
, format_settings(format_settings_)
, name_map(header_.getNamesToIndexesMap())
, prev_positions(header_.columns())
, types(header_.getDataTypes())
{
name_map = getPort().getHeader().getNamesToIndexesMap();
}
inline size_t BSONEachRowRowInputFormat::columnIndex(const StringRef & name, size_t key_index)

View File

@ -43,12 +43,13 @@ JSONEachRowRowInputFormat::JSONEachRowRowInputFormat(
, prev_positions(header_.columns())
, yield_strings(yield_strings_)
{
name_map = getPort().getHeader().getNamesToIndexesMap();
const auto & header = getPort().getHeader();
name_map = header.getNamesToIndexesMap();
if (format_settings_.import_nested_json)
{
for (size_t i = 0; i != header_.columns(); ++i)
for (size_t i = 0; i != header.columns(); ++i)
{
const StringRef column_name = header_.getByPosition(i).name;
const StringRef column_name = header.getByPosition(i).name;
const auto split = Nested::splitName(column_name.toView());
if (!split.second.empty())
{

View File

@ -0,0 +1,130 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <unordered_set>
namespace ProfileEvents
{
extern const Event AsyncInsertCacheHits;
}
namespace CurrentMetrics
{
extern const Metric AsyncInsertCacheSize;
}
namespace DB
{
struct AsyncBlockIDsCache::Cache : public std::unordered_set<String>
{
CurrentMetrics::Increment cache_size_increment;
explicit Cache(std::unordered_set<String> && set_)
: std::unordered_set<String>(std::move(set_))
, cache_size_increment(CurrentMetrics::AsyncInsertCacheSize, size())
{}
};
std::vector<String> AsyncBlockIDsCache::getChildren()
{
auto zookeeper = storage.getZooKeeper();
auto watch_callback = [&](const Coordination::WatchResponse &)
{
auto now = std::chrono::steady_clock::now();
auto last_time = last_updatetime.load();
if (now - last_time < update_min_interval)
{
std::chrono::milliseconds sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(update_min_interval - (now - last_time));
task->scheduleAfter(sleep_time.count());
}
else
task->schedule();
};
std::vector<String> children;
Coordination::Stat stat;
zookeeper->tryGetChildrenWatch(path, children, &stat, watch_callback);
return children;
}
void AsyncBlockIDsCache::update()
try
{
std::vector<String> paths = getChildren();
std::unordered_set<String> set;
for (String & p : paths)
{
set.insert(std::move(p));
}
{
std::lock_guard lock(mu);
cache_ptr = std::make_shared<Cache>(std::move(set));
++version;
}
cv.notify_all();
last_updatetime = std::chrono::steady_clock::now();
}
catch (...)
{
LOG_INFO(log, "Updating async block ids cache failed. Reason: {}", getCurrentExceptionMessage(false));
task->scheduleAfter(update_min_interval.count());
}
AsyncBlockIDsCache::AsyncBlockIDsCache(StorageReplicatedMergeTree & storage_)
: storage(storage_),
update_min_interval(storage.getSettings()->async_block_ids_cache_min_update_interval_ms),
path(storage.zookeeper_path + "/async_blocks"),
log_name(storage.getStorageID().getFullTableName() + " (AsyncBlockIDsCache)"),
log(&Poco::Logger::get(log_name))
{
task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ update(); });
}
void AsyncBlockIDsCache::start()
{
if (storage.getSettings()->use_async_block_ids_cache)
task->activateAndSchedule();
}
/// Caller will keep the version of last call. When the caller calls again, it will wait util gets a newer version.
Strings AsyncBlockIDsCache::detectConflicts(const Strings & paths, UInt64 & last_version)
{
if (!storage.getSettings()->use_async_block_ids_cache)
return {};
std::unique_lock lk(mu);
/// For first time access of this cache, the `last_version` is zero, so it will not block here.
/// For retrying request, We compare the request version and cache version, because zk only returns
/// incomplete information of duplication, we need to update the cache to find out more duplication.
/// The timeout here is to prevent deadlock, just in case.
cv.wait_for(lk, update_min_interval * 2, [&]{return version != last_version;});
if (version == last_version)
LOG_INFO(log, "Read cache with a old version {}", last_version);
CachePtr cur_cache;
cur_cache = cache_ptr;
last_version = version;
lk.unlock();
if (cur_cache == nullptr)
return {};
Strings conflicts;
for (const String & p : paths)
{
if (cur_cache->contains(p))
{
conflicts.push_back(p);
}
}
ProfileEvents::increment(ProfileEvents::AsyncInsertCacheHits, !conflicts.empty());
return conflicts;
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/logger_useful.h>
#include <Core/BackgroundSchedulePool.h>
#include <chrono>
namespace DB
{
class StorageReplicatedMergeTree;
class AsyncBlockIDsCache
{
struct Cache;
using CachePtr = std::shared_ptr<Cache>;
std::vector<String> getChildren();
void update();
public:
explicit AsyncBlockIDsCache(StorageReplicatedMergeTree & storage_);
void start();
void stop() { task->deactivate(); }
Strings detectConflicts(const Strings & paths, UInt64 & last_version);
private:
StorageReplicatedMergeTree & storage;
std::atomic<std::chrono::steady_clock::time_point> last_updatetime;
const std::chrono::milliseconds update_min_interval;
std::mutex mu;
CachePtr cache_ptr;
std::condition_variable cv;
UInt64 version = 0;
const String path;
BackgroundSchedulePool::TaskHolder task;
const String log_name;
Poco::Logger * log;
};
using AsyncBlockIDsCachePtr = std::shared_ptr<AsyncBlockIDsCache>;
}

View File

@ -85,6 +85,8 @@ struct Settings;
M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \
M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "minimum interval between updates of async_block_ids_cache", 0) \
M(Bool, use_async_block_ids_cache, false, "use in-memory cache to filter duplicated async inserts based on block ids", 0) \
M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \

View File

@ -144,6 +144,7 @@ bool ReplicatedMergeTreeRestartingThread::runImpl()
storage.mutations_finalizing_task->activateAndSchedule();
storage.merge_selecting_task->activateAndSchedule();
storage.cleanup_thread.start();
storage.async_block_ids_cache.start();
storage.part_check_thread.start();
LOG_DEBUG(log, "Table started successfully");

View File

@ -5,6 +5,7 @@
#include <Common/SipHash.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ThreadFuzzer.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <DataTypes/ObjectUtils.h>
#include <Core/Block.h>
#include <IO/Operators.h>
@ -105,7 +106,7 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
String conflict_block_id = p.filename();
auto it = block_id_to_offset_idx.find(conflict_block_id);
if (it == block_id_to_offset_idx.end())
throw Exception("Unknown conflict path " + conflict_block_id, ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown conflict path {}", conflict_block_id);
/// if this filter is for self_dedup, that means the block paths is selected by `filterSelfDuplicate`, which is a self purge.
/// in this case, we don't know if zk has this insert, then we should keep one insert, to avoid missing this insert.
offset_idx.insert(std::end(offset_idx), std::begin(it->second) + self_dedup, std::end(it->second));
@ -544,6 +545,8 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
partition.temp_part = storage.writer.writeTempPart(partition.block_with_partition, metadata_snapshot, context);
}
/// reset the cache version to zero for every partition write.
cache_version = 0;
while (true)
{
partition.temp_part.finalize();
@ -676,6 +679,13 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
BlockIDsType block_id_path ;
if constexpr (async_insert)
{
/// prefilter by cache
conflict_block_ids = storage.async_block_ids_cache.detectConflicts(block_id, cache_version);
if (!conflict_block_ids.empty())
{
cache_version = 0;
return;
}
for (const auto & single_block_id : block_id)
block_id_path.push_back(storage.zookeeper_path + "/async_blocks/" + single_block_id);
}

View File

@ -5,6 +5,7 @@
#include <base/types.h>
#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
namespace Poco { class Logger; }
@ -115,6 +116,8 @@ private:
size_t quorum_timeout_ms;
size_t max_parts_per_block;
UInt64 cache_version = 0;
bool is_attach = false;
bool quorum_parallel = false;
const bool deduplicate = true;

View File

@ -91,6 +91,7 @@
#include <base/scope_guard.h>
#include <Common/scope_guard_safe.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/replace.hpp>
@ -283,6 +284,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, queue(*this, merge_strategy_picker)
, fetcher(*this)
, cleanup_thread(*this)
, async_block_ids_cache(*this)
, part_check_thread(*this)
, restarting_thread(*this)
, part_moves_between_shards_orchestrator(*this)
@ -4404,6 +4406,7 @@ void StorageReplicatedMergeTree::partialShutdown()
mutations_finalizing_task->deactivate();
cleanup_thread.stop();
async_block_ids_cache.stop();
part_check_thread.stop();
/// Stop queue processing

View File

@ -4,6 +4,7 @@
#include <atomic>
#include <pcg_random.hpp>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <Storages/IStorageCluster.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
@ -337,6 +338,7 @@ private:
friend class ReplicatedMergeTreeSinkImpl;
friend class ReplicatedMergeTreePartCheckThread;
friend class ReplicatedMergeTreeCleanupThread;
friend class AsyncBlockIDsCache;
friend class ReplicatedMergeTreeAlterThread;
friend class ReplicatedMergeTreeRestartingThread;
friend class ReplicatedMergeTreeAttachThread;
@ -445,6 +447,8 @@ private:
/// A thread that removes old parts, log entries, and blocks.
ReplicatedMergeTreeCleanupThread cleanup_thread;
AsyncBlockIDsCache async_block_ids_cache;
/// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
ReplicatedMergeTreePartCheckThread part_check_thread;

View File

@ -46,13 +46,15 @@ StorageS3Cluster::StorageS3Cluster(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_)
ContextPtr context_,
bool structure_argument_was_provided_)
: IStorageCluster(table_id_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
, filename(configuration_.url)
, cluster_name(configuration_.cluster_name)
, format_name(configuration_.format)
, compression_method(configuration_.compression_method)
, structure_argument_was_provided(structure_argument_was_provided_)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename});
StorageInMemoryMetadata storage_metadata;
@ -68,7 +70,6 @@ StorageS3Cluster::StorageS3Cluster(
auto columns = StorageS3::getTableStructureFromDataImpl(format_name, s3_configuration, compression_method,
/*distributed_processing_*/false, is_key_with_globs, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns);
add_columns_structure_to_query = true;
}
else
storage_metadata.setColumns(columns_);
@ -111,7 +112,7 @@ Pipe StorageS3Cluster::read(
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
ASTPtr query_to_send = interpreter.getQueryInfo().query->clone();
if (add_columns_structure_to_query)
if (!structure_argument_was_provided)
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 5, getName());

View File

@ -26,7 +26,8 @@ public:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_);
ContextPtr context_,
bool structure_argument_was_provided_);
std::string getName() const override { return "S3Cluster"; }
@ -49,7 +50,7 @@ private:
String compression_method;
NamesAndTypesList virtual_columns;
Block virtual_block;
bool add_columns_structure_to_query = false;
bool structure_argument_was_provided;
};

View File

@ -96,8 +96,9 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
{
StoragePtr storage;
ColumnsDescription columns;
bool structure_argument_was_provided = configuration.structure != "auto";
if (configuration.structure != "auto")
if (structure_argument_was_provided)
{
columns = parseColumnsListFromString(configuration.structure, context);
}
@ -126,7 +127,8 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
context);
context,
structure_argument_was_provided);
}
storage->startup();

View File

@ -3,5 +3,6 @@
<database>system</database>
<table>zookeeper_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<ttl>event_date + INTERVAL 1 WEEK DELETE</ttl>
</zookeeper_log>
</clickhouse>

View File

@ -177,10 +177,7 @@ def get_table_uuid(node, db_atomic, table):
return uuid
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
node_names = ["node1z", "node2z", "node1n", "node2n", "node_another_bucket"]
for node_name in node_names:
@ -257,3 +254,5 @@ def test_restore_another_bucket_path(cluster, db_atomic, zero_copy):
assert node_another_bucket.query(
"SELECT count(*) FROM s3.test FORMAT Values"
) == "({})".format(size * (keys - dropped_keys))
drop_table(cluster)

View File

@ -3,6 +3,7 @@ import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
@ -20,6 +21,12 @@ node2 = cluster.add_instance(
macros={"shard": 0, "replica": 2},
)
settings = {
"mutations_sync": 2,
"replication_alter_partitions_sync": 2,
"optimize_throw_if_noop": 1,
}
@pytest.fixture(scope="module")
def started_cluster():
@ -45,38 +52,34 @@ def split_tsv(data):
@pytest.mark.parametrize("replicated", ["", "replicated"])
def test_merge_simple(started_cluster, replicated):
try:
clickhouse_path = "/var/lib/clickhouse"
db_name = "test"
table_name = "merge_simple"
name = db_name + "." + table_name
table_path = "data/" + db_name + "/" + table_name
nodes = [node1, node2] if replicated else [node1]
engine = (
"ReplicatedMergeTree('/clickhouse/test_merge_simple', '{replica}')"
if replicated
else "MergeTree()"
)
node_check = nodes[-1]
starting_block = 0 if replicated else 1
clickhouse_path = "/var/lib/clickhouse"
db_name = "test"
table_name = "merge_simple"
name = db_name + "." + table_name
table_path = "data/" + db_name + "/" + table_name
nodes = [node1, node2] if replicated else [node1]
engine = (
"ReplicatedMergeTree('/clickhouse/test_merge_simple', '{replica}')"
if replicated
else "MergeTree()"
)
node_check = nodes[-1]
starting_block = 0 if replicated else 1
try:
for node in nodes:
node.query(
"""
CREATE TABLE {name}
(
`a` Int64
)
ENGINE = {engine}
ORDER BY sleep(2)
""".format(
engine=engine, name=name
)
f"create table {name} (a Int64) engine={engine} order by tuple()"
)
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
node1.query("INSERT INTO {name} VALUES (2)".format(name=name))
node1.query("INSERT INTO {name} VALUES (3)".format(name=name))
node1.query(f"INSERT INTO {name} VALUES (1)")
node1.query(f"INSERT INTO {name} VALUES (2)")
node1.query(f"INSERT INTO {name} VALUES (3)")
node1.query(
f"alter table {name} add column b int materialized sleepEachRow(3)",
settings=settings,
)
parts = [
"all_{}_{}_0".format(x, x)
@ -84,15 +87,22 @@ def test_merge_simple(started_cluster, replicated):
]
result_part = "all_{}_{}_1".format(starting_block, starting_block + 2)
# OPTIMIZE will sleep for 3s * 3 (parts) = 9s
def optimize():
node1.query("OPTIMIZE TABLE {name}".format(name=name))
node1.query("OPTIMIZE TABLE {name}".format(name=name), settings=settings)
wait = threading.Thread(target=time.sleep, args=(5,))
wait.start()
t = threading.Thread(target=optimize)
t.start()
time.sleep(1)
# Wait for OPTIMIZE to actually start
assert_eq_with_retry(
node1,
f"select count() from system.merges where table='{table_name}'",
"1\n",
retry_count=30,
sleep_time=0.1,
)
assert (
split_tsv(
node_check.query(
@ -124,17 +134,21 @@ def test_merge_simple(started_cluster, replicated):
]
)
t.join()
wait.join()
# It still can show a row with progress=1, because OPTIMIZE returns before the entry is removed from MergeList
assert (
node_check.query(
"SELECT * FROM system.merges WHERE table = '{name}' and progress < 1".format(
name=table_name
)
f"SELECT * FROM system.merges WHERE table = '{table_name}' and progress < 1"
)
== ""
)
# It will eventually disappear
assert_eq_with_retry(
node_check,
f"SELECT * FROM system.merges WHERE table = '{table_name}' and progress < 1",
"\n",
)
finally:
for node in nodes:
node.query("DROP TABLE {name}".format(name=name))
@ -142,55 +156,53 @@ def test_merge_simple(started_cluster, replicated):
@pytest.mark.parametrize("replicated", ["", "replicated"])
def test_mutation_simple(started_cluster, replicated):
clickhouse_path = "/var/lib/clickhouse"
db_name = "test"
table_name = "mutation_simple"
name = db_name + "." + table_name
table_path = "data/" + db_name + "/" + table_name
nodes = [node1, node2] if replicated else [node1]
engine = (
"ReplicatedMergeTree('/clickhouse/test_mutation_simple', '{replica}')"
if replicated
else "MergeTree()"
)
node_check = nodes[-1]
starting_block = 0 if replicated else 1
try:
clickhouse_path = "/var/lib/clickhouse"
db_name = "test"
table_name = "mutation_simple"
name = db_name + "." + table_name
table_path = "data/" + db_name + "/" + table_name
nodes = [node1, node2] if replicated else [node1]
engine = (
"ReplicatedMergeTree('/clickhouse/test_mutation_simple', '{replica}')"
if replicated
else "MergeTree()"
)
node_check = nodes[-1]
starting_block = 0 if replicated else 1
for node in nodes:
node.query(
"""
CREATE TABLE {name}
(
`a` Int64
)
ENGINE = {engine}
ORDER BY tuple()
""".format(
engine=engine, name=name
)
f"create table {name} (a Int64) engine={engine} order by tuple()"
)
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
node1.query(f"INSERT INTO {name} VALUES (1), (2), (3)")
part = "all_{}_{}_0".format(starting_block, starting_block)
result_part = "all_{}_{}_0_{}".format(
starting_block, starting_block, starting_block + 1
)
# ALTER will sleep for 3s * 3 (rows) = 9s
def alter():
node1.query(
"ALTER TABLE {name} UPDATE a = 42 WHERE sleep(2) OR 1".format(
name=name
),
settings={
"mutations_sync": 1,
},
f"ALTER TABLE {name} UPDATE a = 42 WHERE sleep(3) OR 1",
settings=settings,
)
t = threading.Thread(target=alter)
t.start()
time.sleep(1)
# Wait for the mutation to actually start
assert_eq_with_retry(
node1,
f"select count() from system.merges where table='{table_name}'",
"1\n",
retry_count=30,
sleep_time=0.1,
)
assert (
split_tsv(
node_check.query(
@ -225,13 +237,18 @@ def test_mutation_simple(started_cluster, replicated):
assert (
node_check.query(
"SELECT * FROM system.merges WHERE table = '{name}' and progress < 1".format(
name=table_name
)
f"SELECT * FROM system.merges WHERE table = '{table_name}' and progress < 1"
)
== ""
)
# It will eventually disappear
assert_eq_with_retry(
node_check,
f"SELECT * FROM system.merges WHERE table = '{table_name}' and progress < 1",
"\n",
)
finally:
for node in nodes:
node.query("DROP TABLE {name}".format(name=name))

View File

@ -0,0 +1,110 @@
SELECT materialize(\'Привет, World\') AS s
WHERE (s LIKE \'hell%\') OR (s ILIKE \'%привет%\') OR (s ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 0
QUERY id: 0
PROJECTION COLUMNS
s String
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: materialize, function_type: ordinary, result_type: String
ARGUMENTS
LIST id: 3, nodes: 1
CONSTANT id: 4, constant_value: \'Привет, World\', constant_value_type: String
JOIN TREE
TABLE id: 5, table_name: system.one
WHERE
FUNCTION id: 6, function_name: or, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 7, nodes: 3
FUNCTION id: 8, function_name: like, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 9, nodes: 2
FUNCTION id: 2, function_name: materialize, function_type: ordinary, result_type: String
ARGUMENTS
LIST id: 3, nodes: 1
CONSTANT id: 4, constant_value: \'Привет, World\', constant_value_type: String
CONSTANT id: 10, constant_value: \'hell%\', constant_value_type: String
FUNCTION id: 11, function_name: ilike, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 12, nodes: 2
FUNCTION id: 2, function_name: materialize, function_type: ordinary, result_type: String
ARGUMENTS
LIST id: 3, nodes: 1
CONSTANT id: 4, constant_value: \'Привет, World\', constant_value_type: String
CONSTANT id: 13, constant_value: \'%привет%\', constant_value_type: String
FUNCTION id: 14, function_name: ilike, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 15, nodes: 2
FUNCTION id: 2, function_name: materialize, function_type: ordinary, result_type: String
ARGUMENTS
LIST id: 3, nodes: 1
CONSTANT id: 4, constant_value: \'Привет, World\', constant_value_type: String
CONSTANT id: 16, constant_value: \'world%\', constant_value_type: String
SETTINGS optimize_or_like_chain=0 allow_experimental_analyzer=1
SELECT materialize(\'Привет, World\') AS s
WHERE multiMatchAny(s, [\'^hell\', \'(?i)привет\', \'(?i)^world\']) OR false
SETTINGS optimize_or_like_chain = 1
QUERY id: 0
PROJECTION COLUMNS
s String
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: materialize, function_type: ordinary, result_type: String
ARGUMENTS
LIST id: 3, nodes: 1
CONSTANT id: 4, constant_value: \'Привет, World\', constant_value_type: String
JOIN TREE
TABLE id: 5, table_name: system.one
WHERE
FUNCTION id: 6, function_name: or, function_type: ordinary, result_type: Bool
ARGUMENTS
LIST id: 7, nodes: 2
FUNCTION id: 8, function_name: multiMatchAny, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 9, nodes: 2
FUNCTION id: 2, function_name: materialize, function_type: ordinary, result_type: String
ARGUMENTS
LIST id: 3, nodes: 1
CONSTANT id: 4, constant_value: \'Привет, World\', constant_value_type: String
CONSTANT id: 10, constant_value: Array_[\'^hell\', \'(?i)привет\', \'(?i)^world\'], constant_value_type: Array(String)
CONSTANT id: 11, constant_value: UInt64_0, constant_value_type: Bool
SETTINGS optimize_or_like_chain=1 allow_experimental_analyzer=1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE multiMatchAny(s1, [\'^hell\', \'(?i)^world\']) OR multiMatchAny(s2, [\'(?i)привет\'])
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE multiMatchAny(s1, [\'^hell\', \'(?i)^world\']) OR multiMatchAny(s2, [\'(?i)привет\']) OR (s1 = \'Привет\')
SETTINGS optimize_or_like_chain = 1
Привет, optimized World
Привет, optimized World
Привет, World
Привет, World
Привет, optimized World
Привет, optimized World
Привет, World
Привет, World
Привет, World
SELECT
(materialize(\'Привет, World\') AS s) LIKE \'hell%\' AS test,
s
WHERE multiMatchAny(s, [\'^hell\', \'(?i)привет\', \'(?i)^world\']) OR false
SETTINGS optimize_or_like_chain = 1

View File

@ -1,6 +1,7 @@
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s ILIKE 'world%') SETTINGS optimize_or_like_chain = 0;
EXPLAIN QUERY TREE run_passes=1 SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s ILIKE 'world%') SETTINGS optimize_or_like_chain = 0, allow_experimental_analyzer = 1;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s ILIKE 'world%') SETTINGS optimize_or_like_chain = 1;
EXPLAIN QUERY TREE run_passes=1 SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s ILIKE 'world%') SETTINGS optimize_or_like_chain = 1, allow_experimental_analyzer = 1;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1;
EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('Привет, World') AS s2 WHERE (s1 LIKE 'hell%') OR (s2 ILIKE '%привет%') OR (s1 ILIKE 'world%') SETTINGS optimize_or_like_chain = 1 SETTINGS allow_hyperscan = 0;
@ -10,9 +11,18 @@ EXPLAIN SYNTAX SELECT materialize('Привет, World') AS s1, materialize('П
SELECT materialize('Привет, optimized World') AS s WHERE (s LIKE 'hell%') OR (s LIKE '%привет%') OR (s ILIKE '%world') SETTINGS optimize_or_like_chain = 1;
SELECT materialize('Привет, optimized World') AS s WHERE (s LIKE 'hell%') OR (s LIKE '%привет%') OR (s ILIKE '%world') SETTINGS optimize_or_like_chain = 1, allow_experimental_analyzer = 1;
SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s LIKE '%привет%') OR (s ILIKE '%world') SETTINGS optimize_or_like_chain = 0;
SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s LIKE '%привет%') OR (s ILIKE '%world') SETTINGS optimize_or_like_chain = 0, allow_experimental_analyzer = 1;
SELECT materialize('Привет, optimized World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s LIKE 'world%') SETTINGS optimize_or_like_chain = 1;
SELECT materialize('Привет, optimized World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s LIKE 'world%') SETTINGS optimize_or_like_chain = 1, allow_experimental_analyzer = 1;
SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s LIKE 'world%') SETTINGS optimize_or_like_chain = 0;
SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s LIKE 'world%') SETTINGS optimize_or_like_chain = 0, allow_experimental_analyzer = 1;
SELECT materialize('Привет, World') AS s WHERE (s LIKE 'hell%') OR (s ILIKE '%привет%') OR (s ILIKE 'world%') SETTINGS optimize_or_like_chain = 1, allow_experimental_analyzer = 1;
-- Aliases

View File

@ -1,40 +0,0 @@
SELECT materialize(\'Привет, World\') AS s
WHERE (s LIKE \'hell%\') OR (s ILIKE \'%привет%\') OR (s ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 0
SELECT materialize(\'Привет, World\') AS s
WHERE multiMatchAny(s, [\'^hell\', \'(?i)привет\', \'(?i)^world\']) OR false
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE multiMatchAny(s1, [\'^hell\', \'(?i)^world\']) OR multiMatchAny(s2, [\'(?i)привет\'])
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE (s1 LIKE \'hell%\') OR (s2 ILIKE \'%привет%\') OR (s1 ILIKE \'world%\')
SETTINGS optimize_or_like_chain = 1
SELECT
materialize(\'Привет, World\') AS s1,
materialize(\'Привет, World\') AS s2
WHERE multiMatchAny(s1, [\'^hell\', \'(?i)^world\']) OR multiMatchAny(s2, [\'(?i)привет\']) OR (s1 = \'Привет\')
SETTINGS optimize_or_like_chain = 1
Привет, optimized World
Привет, World
Привет, optimized World
Привет, World
SELECT
(materialize(\'Привет, World\') AS s) LIKE \'hell%\' AS test,
s
WHERE multiMatchAny(s, [\'^hell\', \'(?i)привет\', \'(?i)^world\']) OR false
SETTINGS optimize_or_like_chain = 1

View File

@ -81,7 +81,7 @@ EventDate DateTime,
KeyID UInt32
) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/t_async_insert_dedup', '{replica}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (KeyID, EventDate)
ORDER BY (KeyID, EventDate) SETTINGS use_async_block_ids_cache = 1
''')
q = queue.Queue(100)
@ -98,7 +98,7 @@ gen.join()
retry = 0
while (True):
while True:
time.sleep(5)
result = client.query("select KeyID from t_async_insert_dedup order by KeyID")
result = result.split()
@ -124,6 +124,16 @@ while (True):
else:
print(len(result), flush=True)
break
result = client.query("SELECT value FROM system.metrics where metric = 'AsyncInsertCacheSize'")
result = int(result.split()[0])
if result <= 0:
raise Exception(f"AsyncInsertCacheSize should > 0, but got {result}")
result = client.query("SELECT value FROM system.events where event = 'AsyncInsertCacheHits'")
result = int(result.split()[0])
if result <= 0:
raise Exception(f"AsyncInsertCacheHits should > 0, but got {result}")
client.query("DROP TABLE IF EXISTS t_async_insert_dedup NO DELAY")
os._exit(os.EX_OK)

View File

@ -76,6 +76,9 @@ INSERT INTO regexp_dictionary_source_table VALUES (2, 0, '33/tclwebkit', ['versi
SYSTEM RELOAD dictionary regexp_dict1;
select dictGet(regexp_dict1, ('name', 'version', 'comment'), '33/tclwebkit');
truncate table regexp_dictionary_source_table;
SYSTEM RELOAD dictionary regexp_dict1; -- { serverError 489 }
DROP TABLE IF EXISTS regexp_dictionary_source_table;
DROP TABLE IF EXISTS needle_table;

View File

@ -0,0 +1,4 @@
1 2 3
4 5 6
7 8 9
0 0 0

View File

@ -0,0 +1,9 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
drop table if exists test;
create table test (x UInt32, y UInt32, z UInt32) engine=Memory();
insert into test select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/a.tsv');
select * from test;
drop table test;

View File

@ -0,0 +1,19 @@
Row 1:
──────
x.a: 1
x.b: 2
Row 2:
──────
x.a: 3
x.b: 4
Row 3:
──────
x.a: 5
x.b: 6
Row 4:
──────
x.a: 7
x.b: 8

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q $'create table test (`x.a` UInt32, `x.b` UInt32) engine=Memory'
echo '{"x" : {"a" : 1, "b" : 2}}' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+JSONEachRow&input_format_import_nested_json=1&max_threads=10&input_format_parallel_parsing=0"
echo '{"x" : {"a" : 3, "b" : 4}}' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+JSONEachRow&input_format_import_nested_json=1&max_threads=10&input_format_parallel_parsing=1"
$CLICKHOUSE_CLIENT -q $'select 5 as `x.a`, 6 as `x.b` format BSONEachRow' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+BSONEachRow&max_threads=10&input_format_parallel_parsing=0"
$CLICKHOUSE_CLIENT -q $'select 7 as `x.a`, 8 as `x.b` format BSONEachRow' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+BSONEachRow&max_threads=10&input_format_parallel_parsing=1"
$CLICKHOUSE_CLIENT -q "select * from test order by 1 format Vertical";
$CLICKHOUSE_CLIENT -q "drop table test";