From bef6168c05c5491125898e05440e797bc6e13501 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Tue, 10 Jan 2023 13:19:12 +0100 Subject: [PATCH 01/13] support cache for async inserts block ids --- src/Storages/MergeTree/AsyncBlockIDsCache.cpp | 101 ++++++++++++++++++ src/Storages/MergeTree/AsyncBlockIDsCache.h | 55 ++++++++++ src/Storages/MergeTree/MergeTreeSettings.h | 2 + .../ReplicatedMergeTreeRestartingThread.cpp | 1 + .../MergeTree/ReplicatedMergeTreeSink.cpp | 7 +- .../MergeTree/ReplicatedMergeTreeSink.h | 3 + src/Storages/StorageReplicatedMergeTree.cpp | 3 + src/Storages/StorageReplicatedMergeTree.h | 4 + .../02481_async_insert_dedup.python | 2 +- 9 files changed, 176 insertions(+), 2 deletions(-) create mode 100644 src/Storages/MergeTree/AsyncBlockIDsCache.cpp create mode 100644 src/Storages/MergeTree/AsyncBlockIDsCache.h diff --git a/src/Storages/MergeTree/AsyncBlockIDsCache.cpp b/src/Storages/MergeTree/AsyncBlockIDsCache.cpp new file mode 100644 index 00000000000..1e2da348083 --- /dev/null +++ b/src/Storages/MergeTree/AsyncBlockIDsCache.cpp @@ -0,0 +1,101 @@ +#include +#include + +#include +#include + +namespace DB +{ + +std::vector AsyncBlockIDsCache::getChildren() +{ + auto zookeeper = storage.getZooKeeper(); + + auto watch_callback = [&](const Coordination::WatchResponse &) + { + auto now = std::chrono::steady_clock::now(); + if (now - last_updatetime < update_min_interval) + { + std::chrono::milliseconds sleep_time = std::chrono::duration_cast(update_min_interval - (now - last_updatetime)); + task->scheduleAfter(sleep_time.count()); + } + else + task->schedule(); + }; + std::vector children; + Coordination::Stat stat; + zookeeper->tryGetChildrenWatch(path, children, &stat, watch_callback); + return children; +} + +void AsyncBlockIDsCache::update() +try +{ + std::vector paths = getChildren(); + Cache cache; + for (String & p : paths) + { + cache.insert(std::move(p)); + } + LOG_TRACE(log, "Updating async block succeed. {} block ids has been updated.", cache.size()); + { + std::lock_guard lock(mu); + cache_ptr = std::make_shared(std::move(cache)); + ++version; + } + cv.notify_all(); + last_updatetime = std::chrono::steady_clock::now(); +} +catch (...) +{ + LOG_INFO(log, "Updating async block ids cache failed. Reason: {}", getCurrentExceptionMessage(false)); + task->scheduleAfter(update_min_interval.count()); +} + +AsyncBlockIDsCache::AsyncBlockIDsCache(StorageReplicatedMergeTree & storage_) + : storage(storage_), + update_min_interval(storage.getSettings()->async_block_ids_cache_min_update_interval_ms), + path(storage.zookeeper_path + "/async_blocks"), + log_name(storage.getStorageID().getFullTableName() + " (AsyncBlockIDsCache)"), + log(&Poco::Logger::get(log_name)) +{ + task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ update(); }); +} + +void AsyncBlockIDsCache::start() +{ + if (storage.getSettings()->use_async_block_ids_cache) + task->activateAndSchedule(); +} + +/// Caller will keep the version of last call. When the caller calls again, it will wait util gets a newer version. +Strings AsyncBlockIDsCache::detectConflicts(const Strings & paths, UInt64 & last_version) +{ + if (!storage.getSettings()->use_async_block_ids_cache) + return {}; + + std::unique_lock lk(mu); + cv.wait_for(lk, update_min_interval, [&]{return last_version == 0 || version != last_version;}); + + CachePtr cur_cache; + cur_cache = cache_ptr; + last_version = version; + + lk.unlock(); + + if (cur_cache == nullptr) + return {}; + + Strings conflicts; + for (const String & p : paths) + { + if (cur_cache->contains(p)) + { + conflicts.push_back(p); + } + } + + return conflicts; +} + +} diff --git a/src/Storages/MergeTree/AsyncBlockIDsCache.h b/src/Storages/MergeTree/AsyncBlockIDsCache.h new file mode 100644 index 00000000000..8281722a0f0 --- /dev/null +++ b/src/Storages/MergeTree/AsyncBlockIDsCache.h @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include + +#include +#include + +namespace DB +{ + +class StorageReplicatedMergeTree; + +class AsyncBlockIDsCache +{ + using Cache = std::unordered_set; + using CachePtr = std::shared_ptr; + + std::vector getChildren(); + + void update(); + +public: + explicit AsyncBlockIDsCache(StorageReplicatedMergeTree & storage_); + + void start(); + + void stop() { task->deactivate(); } + + Strings detectConflicts(const Strings & paths, UInt64 & last_version); + +private: + + StorageReplicatedMergeTree & storage; + + std::chrono::steady_clock::time_point last_updatetime; + const std::chrono::milliseconds update_min_interval; + + std::mutex mu; + CachePtr cache_ptr; + std::condition_variable cv; + UInt64 version = 0; + + const String path; + + BackgroundSchedulePool::TaskHolder task; + + const String log_name; + Poco::Logger * log; +}; + +using AsyncBlockIDsCachePtr = std::shared_ptr; + +} diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 37e9bf5779c..da3b61c2144 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -84,6 +84,8 @@ struct Settings; M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \ M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \ M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \ + M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "minimum interval of updating async_block_ids_cache", 0) \ + M(Bool, use_async_block_ids_cache, false, "if use async_block_ids_cache to filter duplicated async inserts in mem", 0) \ M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \ M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \ M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \ diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 29528e9ff80..10e70ea2f3c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -144,6 +144,7 @@ bool ReplicatedMergeTreeRestartingThread::runImpl() storage.mutations_finalizing_task->activateAndSchedule(); storage.merge_selecting_task->activateAndSchedule(); storage.cleanup_thread.start(); + storage.async_block_ids_cache.start(); storage.part_check_thread.start(); LOG_DEBUG(log, "Table started successfully"); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 7bd5df2b1dc..2009ff7f5a6 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -5,6 +5,7 @@ #include #include #include +#include "Storages/MergeTree/AsyncBlockIDsCache.h" #include #include #include @@ -100,7 +101,7 @@ namespace String conflict_block_id = p.filename(); auto it = partition.block_id_to_offset_idx.find(conflict_block_id); if (it == partition.block_id_to_offset_idx.end()) - throw Exception("Unknown conflict path " + conflict_block_id, ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown conflict path {}", conflict_block_id); offset_idx.push_back(it->second); } std::sort(offset_idx.begin(), offset_idx.end()); @@ -611,6 +612,10 @@ std::vector ReplicatedMergeTreeSinkImpl::commitPart( BlockIDsType block_id_path ; if constexpr (async_insert) { + /// prefilter by cache + conflict_block_ids = storage.async_block_ids_cache.detectConflicts(block_id, cache_version); + if (!conflict_block_ids.empty()) + return; for (const auto & single_block_id : block_id) block_id_path.push_back(storage.zookeeper_path + "/async_blocks/" + single_block_id); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h index 1199df95b67..3777a9f7285 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace Poco { class Logger; } @@ -115,6 +116,8 @@ private: size_t quorum_timeout_ms; size_t max_parts_per_block; + UInt64 cache_version = 0; + bool is_attach = false; bool quorum_parallel = false; const bool deduplicate = true; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7056e6a6952..1ea25df032c 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -87,6 +87,7 @@ #include #include +#include "Storages/MergeTree/AsyncBlockIDsCache.h" #include #include @@ -278,6 +279,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( , queue(*this, merge_strategy_picker) , fetcher(*this) , cleanup_thread(*this) + , async_block_ids_cache(*this) , part_check_thread(*this) , restarting_thread(*this) , part_moves_between_shards_orchestrator(*this) @@ -4337,6 +4339,7 @@ void StorageReplicatedMergeTree::partialShutdown() mutations_finalizing_task->deactivate(); cleanup_thread.stop(); + async_block_ids_cache.stop(); part_check_thread.stop(); /// Stop queue processing diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index c5e95ab7b39..99e317b9668 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -332,6 +333,7 @@ private: friend class ReplicatedMergeTreeSinkImpl; friend class ReplicatedMergeTreePartCheckThread; friend class ReplicatedMergeTreeCleanupThread; + friend class AsyncBlockIDsCache; friend class ReplicatedMergeTreeAlterThread; friend class ReplicatedMergeTreeRestartingThread; friend class ReplicatedMergeTreeAttachThread; @@ -440,6 +442,8 @@ private: /// A thread that removes old parts, log entries, and blocks. ReplicatedMergeTreeCleanupThread cleanup_thread; + AsyncBlockIDsCache async_block_ids_cache; + /// A thread that checks the data of the parts, as well as the queue of the parts to be checked. ReplicatedMergeTreePartCheckThread part_check_thread; diff --git a/tests/queries/0_stateless/02481_async_insert_dedup.python b/tests/queries/0_stateless/02481_async_insert_dedup.python index 16808aeb7a2..70f4462dcf0 100644 --- a/tests/queries/0_stateless/02481_async_insert_dedup.python +++ b/tests/queries/0_stateless/02481_async_insert_dedup.python @@ -81,7 +81,7 @@ EventDate DateTime, KeyID UInt32 ) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/t_async_insert_dedup', '{replica}') PARTITION BY toYYYYMM(EventDate) -ORDER BY (KeyID, EventDate) +ORDER BY (KeyID, EventDate) SETTINGS use_async_block_ids_cache = 1 ''') q = queue.Queue(100) From 9e99c7e11678f445dd40176dbf515684db05447e Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 12 Jan 2023 17:44:20 +0100 Subject: [PATCH 02/13] Update src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp Co-authored-by: Sergei Trifonov --- src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index b37eec77aa5..4a516f9f018 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -5,7 +5,7 @@ #include #include #include -#include "Storages/MergeTree/AsyncBlockIDsCache.h" +#include #include #include #include From bcf813fedc360ea5558d8ec3cfd58cbfe8b025c1 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 12 Jan 2023 17:44:28 +0100 Subject: [PATCH 03/13] Update src/Storages/StorageReplicatedMergeTree.cpp Co-authored-by: Sergei Trifonov --- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index eb704834c44..e927b0f3818 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -91,7 +91,7 @@ #include #include -#include "Storages/MergeTree/AsyncBlockIDsCache.h" +#include #include #include From 2fb2f503e3925c72f25c0c48bac4d24cdf123f98 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 12 Jan 2023 17:45:02 +0100 Subject: [PATCH 04/13] Update src/Storages/MergeTree/MergeTreeSettings.h Co-authored-by: Sergei Trifonov --- src/Storages/MergeTree/MergeTreeSettings.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 38940ab9408..21e4a5c9454 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -86,7 +86,7 @@ struct Settings; M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \ M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \ M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "minimum interval of updating async_block_ids_cache", 0) \ - M(Bool, use_async_block_ids_cache, false, "if use async_block_ids_cache to filter duplicated async inserts in mem", 0) \ + M(Bool, use_async_block_ids_cache, false, "use in-memory cache to filter duplicated async inserts based on block ids", 0) \ M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \ M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \ M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \ From ed49ebf01a1d0e1861909ac87407140190c761c0 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Fri, 13 Jan 2023 20:26:08 +0100 Subject: [PATCH 05/13] update setting explain --- src/Storages/MergeTree/MergeTreeSettings.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 21e4a5c9454..204f7b941d0 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -85,7 +85,7 @@ struct Settings; M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \ M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \ M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \ - M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "minimum interval of updating async_block_ids_cache", 0) \ + M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "minimum interval between updates of async_block_ids_cache", 0) \ M(Bool, use_async_block_ids_cache, false, "use in-memory cache to filter duplicated async inserts based on block ids", 0) \ M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \ M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \ From 3481b4d50a5d713aa2755314837a8a7efad186ac Mon Sep 17 00:00:00 2001 From: Han Fei Date: Mon, 16 Jan 2023 10:41:35 +0100 Subject: [PATCH 06/13] fix style --- src/Storages/StorageReplicatedMergeTree.h | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index b62eabfd69b..71f143aa8a4 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -4,23 +4,7 @@ #include #include #include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include From 8a74238fe0fabfd5a5d61912e1b7b80d47550f6e Mon Sep 17 00:00:00 2001 From: Han Fei Date: Tue, 17 Jan 2023 15:47:52 +0100 Subject: [PATCH 07/13] improve --- src/Common/CurrentMetrics.cpp | 1 + src/Common/ProfileEvents.cpp | 1 + src/Storages/MergeTree/AsyncBlockIDsCache.cpp | 33 +++++++++++++++---- src/Storages/MergeTree/AsyncBlockIDsCache.h | 3 +- .../MergeTree/ReplicatedMergeTreeSink.cpp | 3 ++ .../02481_async_insert_dedup.python | 12 ++++++- 6 files changed, 44 insertions(+), 9 deletions(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 5e692101354..0f85e548215 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -100,6 +100,7 @@ M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \ M(FilesystemCacheSize, "Filesystem cache size in bytes") \ M(FilesystemCacheElements, "Filesystem cache elements (file segments)") \ + M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \ M(S3Requests, "S3 requests") \ M(KeeperAliveConnections, "Number of alive connections") \ M(KeeperOutstandingRequets, "Number of outstanding requests") \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 1b387c5a080..a1a43b9da9c 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -10,6 +10,7 @@ M(InsertQuery, "Same as Query, but only for INSERT queries.") \ M(AsyncInsertQuery, "Same as InsertQuery, but only for asynchronous INSERT queries.") \ M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.") \ + M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.") \ M(FailedQuery, "Number of failed queries.") \ M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.") \ M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.") \ diff --git a/src/Storages/MergeTree/AsyncBlockIDsCache.cpp b/src/Storages/MergeTree/AsyncBlockIDsCache.cpp index 51622c2e0d4..3e991adfc7f 100644 --- a/src/Storages/MergeTree/AsyncBlockIDsCache.cpp +++ b/src/Storages/MergeTree/AsyncBlockIDsCache.cpp @@ -1,12 +1,32 @@ +#include +#include #include #include -#include -#include +#include + +namespace ProfileEvents +{ + extern const Event AsyncInsertCacheHits; +} + +namespace CurrentMetrics +{ + extern const Metric AsyncInsertCacheSize; +} namespace DB { +struct AsyncBlockIDsCache::Cache : public std::unordered_set +{ + CurrentMetrics::Increment cache_size_increment; + explicit Cache(std::unordered_set && set_) + : std::unordered_set(std::move(set_)) + , cache_size_increment(CurrentMetrics::AsyncInsertCacheSize, size()) + {} +}; + std::vector AsyncBlockIDsCache::getChildren() { auto zookeeper = storage.getZooKeeper(); @@ -32,15 +52,14 @@ void AsyncBlockIDsCache::update() try { std::vector paths = getChildren(); - Cache cache; + std::unordered_set set; for (String & p : paths) { - cache.insert(std::move(p)); + set.insert(std::move(p)); } - LOG_TRACE(log, "Updating async block succeed. {} block ids has been updated.", cache.size()); { std::lock_guard lock(mu); - cache_ptr = std::make_shared(std::move(cache)); + cache_ptr = std::make_shared(std::move(set)); ++version; } cv.notify_all(); @@ -95,6 +114,8 @@ Strings AsyncBlockIDsCache::detectConflicts(const Strings & paths, UInt64 & last } } + ProfileEvents::increment(ProfileEvents::AsyncInsertCacheHits); + return conflicts; } diff --git a/src/Storages/MergeTree/AsyncBlockIDsCache.h b/src/Storages/MergeTree/AsyncBlockIDsCache.h index 8281722a0f0..e92926813e8 100644 --- a/src/Storages/MergeTree/AsyncBlockIDsCache.h +++ b/src/Storages/MergeTree/AsyncBlockIDsCache.h @@ -5,7 +5,6 @@ #include #include -#include namespace DB { @@ -14,7 +13,7 @@ class StorageReplicatedMergeTree; class AsyncBlockIDsCache { - using Cache = std::unordered_set; + struct Cache; using CachePtr = std::shared_ptr; std::vector getChildren(); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 9f7224bbeea..14ae620176a 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -682,7 +682,10 @@ std::vector ReplicatedMergeTreeSinkImpl::commitPart( /// prefilter by cache conflict_block_ids = storage.async_block_ids_cache.detectConflicts(block_id, cache_version); if (!conflict_block_ids.empty()) + { + cache_version = 0; return; + } for (const auto & single_block_id : block_id) block_id_path.push_back(storage.zookeeper_path + "/async_blocks/" + single_block_id); } diff --git a/tests/queries/0_stateless/02481_async_insert_dedup.python b/tests/queries/0_stateless/02481_async_insert_dedup.python index 70f4462dcf0..0cea7301ce5 100644 --- a/tests/queries/0_stateless/02481_async_insert_dedup.python +++ b/tests/queries/0_stateless/02481_async_insert_dedup.python @@ -98,7 +98,7 @@ gen.join() retry = 0 -while (True): +while True: time.sleep(5) result = client.query("select KeyID from t_async_insert_dedup order by KeyID") result = result.split() @@ -124,6 +124,16 @@ while (True): else: print(len(result), flush=True) break + +result = client.query("SELECT value FROM system.metrics where metric = 'AsyncInsertCacheSize'") +result = int(result.split()[0]) +if result <= 0: + raise Exception(f"AsyncInsertCacheSize should > 0, but got {result}") +result = client.query("SELECT value FROM system.events where event = 'AsyncInsertCacheHits'") +result = int(result.split()[0]) +if result <= 0: + raise Exception(f"AsyncInsertCacheHits should > 0, but got {result}") + client.query("DROP TABLE IF EXISTS t_async_insert_dedup NO DELAY") os._exit(os.EX_OK) From e51123c9b0cd2a8c38cc2dc6ef1ba28be06c7e9b Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 18 Jan 2023 13:11:07 +0100 Subject: [PATCH 08/13] fix data race --- src/Storages/MergeTree/AsyncBlockIDsCache.cpp | 16 ++++++++++++---- src/Storages/MergeTree/AsyncBlockIDsCache.h | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/Storages/MergeTree/AsyncBlockIDsCache.cpp b/src/Storages/MergeTree/AsyncBlockIDsCache.cpp index 3e991adfc7f..b65b670d87f 100644 --- a/src/Storages/MergeTree/AsyncBlockIDsCache.cpp +++ b/src/Storages/MergeTree/AsyncBlockIDsCache.cpp @@ -34,9 +34,10 @@ std::vector AsyncBlockIDsCache::getChildren() auto watch_callback = [&](const Coordination::WatchResponse &) { auto now = std::chrono::steady_clock::now(); - if (now - last_updatetime < update_min_interval) + auto last_time = last_updatetime.load(); + if (now - last_time < update_min_interval) { - std::chrono::milliseconds sleep_time = std::chrono::duration_cast(update_min_interval - (now - last_updatetime)); + std::chrono::milliseconds sleep_time = std::chrono::duration_cast(update_min_interval - (now - last_time)); task->scheduleAfter(sleep_time.count()); } else @@ -94,7 +95,14 @@ Strings AsyncBlockIDsCache::detectConflicts(const Strings & paths, UInt64 & last return {}; std::unique_lock lk(mu); - cv.wait_for(lk, update_min_interval, [&]{return version != last_version;}); + /// For first time access of this cache, the `last_version` is zero, so it will not block here. + /// For retrying request, We compare the request version and cache version, because zk only returns + /// incomplete information of duplication, we need to update the cache to find out more duplication. + /// The timeout here is to prevent deadlock, just in case. + cv.wait_for(lk, update_min_interval * 2, [&]{return version != last_version;}); + + if (version == last_version) + LOG_INFO(log, "Read cache with a old version {}", last_version); CachePtr cur_cache; cur_cache = cache_ptr; @@ -114,7 +122,7 @@ Strings AsyncBlockIDsCache::detectConflicts(const Strings & paths, UInt64 & last } } - ProfileEvents::increment(ProfileEvents::AsyncInsertCacheHits); + ProfileEvents::increment(ProfileEvents::AsyncInsertCacheHits, !conflicts.empty()); return conflicts; } diff --git a/src/Storages/MergeTree/AsyncBlockIDsCache.h b/src/Storages/MergeTree/AsyncBlockIDsCache.h index e92926813e8..a661d00f8a6 100644 --- a/src/Storages/MergeTree/AsyncBlockIDsCache.h +++ b/src/Storages/MergeTree/AsyncBlockIDsCache.h @@ -33,7 +33,7 @@ private: StorageReplicatedMergeTree & storage; - std::chrono::steady_clock::time_point last_updatetime; + std::atomic last_updatetime; const std::chrono::milliseconds update_min_interval; std::mutex mu; From 1603e734f97ec930ddf9acd64fa411e682dd21d7 Mon Sep 17 00:00:00 2001 From: Yatsishin Ilya <2159081+qoega@users.noreply.github.com> Date: Wed, 18 Jan 2023 15:36:24 +0000 Subject: [PATCH 09/13] Make test simpler to see errors not in teardown --- .../test_replicated_merge_tree_s3_restore/test.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_replicated_merge_tree_s3_restore/test.py b/tests/integration/test_replicated_merge_tree_s3_restore/test.py index 822a81d2655..f26b3e7bd35 100644 --- a/tests/integration/test_replicated_merge_tree_s3_restore/test.py +++ b/tests/integration/test_replicated_merge_tree_s3_restore/test.py @@ -177,10 +177,7 @@ def get_table_uuid(node, db_atomic, table): return uuid -@pytest.fixture(autouse=True) def drop_table(cluster): - yield - node_names = ["node1z", "node2z", "node1n", "node2n", "node_another_bucket"] for node_name in node_names: @@ -257,3 +254,5 @@ def test_restore_another_bucket_path(cluster, db_atomic, zero_copy): assert node_another_bucket.query( "SELECT count(*) FROM s3.test FORMAT Values" ) == "({})".format(size * (keys - dropped_keys)) + + drop_table(cluster) From 117ec13c9e52fa35a9cb97e32ea6129240b8c0aa Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 18 Jan 2023 20:33:50 +0000 Subject: [PATCH 10/13] Fix s3Cluster schema inference when structure from insertion table is used --- src/Storages/StorageS3Cluster.cpp | 7 ++++--- src/Storages/StorageS3Cluster.h | 5 +++-- src/TableFunctions/TableFunctionS3Cluster.cpp | 6 ++++-- ...4_s3_cluster_insert_select_schema_inference.reference | 4 ++++ .../02534_s3_cluster_insert_select_schema_inference.sql | 9 +++++++++ 5 files changed, 24 insertions(+), 7 deletions(-) create mode 100644 tests/queries/0_stateless/02534_s3_cluster_insert_select_schema_inference.reference create mode 100644 tests/queries/0_stateless/02534_s3_cluster_insert_select_schema_inference.sql diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index bf7d3004782..3ee10113b32 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -46,13 +46,15 @@ StorageS3Cluster::StorageS3Cluster( const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - ContextPtr context_) + ContextPtr context_, + bool structure_argument_was_provided_) : IStorageCluster(table_id_) , s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers} , filename(configuration_.url) , cluster_name(configuration_.cluster_name) , format_name(configuration_.format) , compression_method(configuration_.compression_method) + , structure_argument_was_provided(structure_argument_was_provided_) { context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename}); StorageInMemoryMetadata storage_metadata; @@ -68,7 +70,6 @@ StorageS3Cluster::StorageS3Cluster( auto columns = StorageS3::getTableStructureFromDataImpl(format_name, s3_configuration, compression_method, /*distributed_processing_*/false, is_key_with_globs, /*format_settings=*/std::nullopt, context_); storage_metadata.setColumns(columns); - add_columns_structure_to_query = true; } else storage_metadata.setColumns(columns_); @@ -111,7 +112,7 @@ Pipe StorageS3Cluster::read( const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; ASTPtr query_to_send = interpreter.getQueryInfo().query->clone(); - if (add_columns_structure_to_query) + if (!structure_argument_was_provided) addColumnsStructureToQueryWithClusterEngine( query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 5, getName()); diff --git a/src/Storages/StorageS3Cluster.h b/src/Storages/StorageS3Cluster.h index cd5a13ce2ea..d9b69b4913b 100644 --- a/src/Storages/StorageS3Cluster.h +++ b/src/Storages/StorageS3Cluster.h @@ -26,7 +26,8 @@ public: const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - ContextPtr context_); + ContextPtr context_, + bool structure_argument_was_provided_); std::string getName() const override { return "S3Cluster"; } @@ -49,7 +50,7 @@ private: String compression_method; NamesAndTypesList virtual_columns; Block virtual_block; - bool add_columns_structure_to_query = false; + bool structure_argument_was_provided; }; diff --git a/src/TableFunctions/TableFunctionS3Cluster.cpp b/src/TableFunctions/TableFunctionS3Cluster.cpp index 5823aaad876..82790e1a328 100644 --- a/src/TableFunctions/TableFunctionS3Cluster.cpp +++ b/src/TableFunctions/TableFunctionS3Cluster.cpp @@ -96,8 +96,9 @@ StoragePtr TableFunctionS3Cluster::executeImpl( { StoragePtr storage; ColumnsDescription columns; + bool structure_argument_was_provided = configuration.structure != "auto"; - if (configuration.structure != "auto") + if (structure_argument_was_provided) { columns = parseColumnsListFromString(configuration.structure, context); } @@ -126,7 +127,8 @@ StoragePtr TableFunctionS3Cluster::executeImpl( StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, - context); + context, + structure_argument_was_provided); } storage->startup(); diff --git a/tests/queries/0_stateless/02534_s3_cluster_insert_select_schema_inference.reference b/tests/queries/0_stateless/02534_s3_cluster_insert_select_schema_inference.reference new file mode 100644 index 00000000000..acd7c60768b --- /dev/null +++ b/tests/queries/0_stateless/02534_s3_cluster_insert_select_schema_inference.reference @@ -0,0 +1,4 @@ +1 2 3 +4 5 6 +7 8 9 +0 0 0 diff --git a/tests/queries/0_stateless/02534_s3_cluster_insert_select_schema_inference.sql b/tests/queries/0_stateless/02534_s3_cluster_insert_select_schema_inference.sql new file mode 100644 index 00000000000..41278b0c16a --- /dev/null +++ b/tests/queries/0_stateless/02534_s3_cluster_insert_select_schema_inference.sql @@ -0,0 +1,9 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: Depends on AWS + +drop table if exists test; +create table test (x UInt32, y UInt32, z UInt32) engine=Memory(); +insert into test select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/a.tsv'); +select * from test; +drop table test; + From a8f20363f413b2653352e43b970613b352fc2879 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 18 Jan 2023 22:49:03 +0000 Subject: [PATCH 11/13] Fix JSON/BSONEachRow parsing with HTTP --- .../Impl/BSONEachRowRowInputFormat.cpp | 2 +- .../Impl/JSONEachRowRowInputFormat.cpp | 7 ++++--- .../02535_json_bson_each_row_curl.reference | 19 +++++++++++++++++ .../02535_json_bson_each_row_curl.sh | 21 +++++++++++++++++++ ...json_each_row_import_nested_curl.reference | 9 ++++++++ .../02535_json_each_row_import_nested_curl.sh | 16 ++++++++++++++ 6 files changed, 70 insertions(+), 4 deletions(-) create mode 100644 tests/queries/0_stateless/02535_json_bson_each_row_curl.reference create mode 100755 tests/queries/0_stateless/02535_json_bson_each_row_curl.sh create mode 100644 tests/queries/0_stateless/02535_json_each_row_import_nested_curl.reference create mode 100755 tests/queries/0_stateless/02535_json_each_row_import_nested_curl.sh diff --git a/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp b/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp index fd0c553538f..84d84756bd0 100644 --- a/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp @@ -53,10 +53,10 @@ BSONEachRowRowInputFormat::BSONEachRowRowInputFormat( ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) : IRowInputFormat(header_, in_, std::move(params_)) , format_settings(format_settings_) - , name_map(header_.getNamesToIndexesMap()) , prev_positions(header_.columns()) , types(header_.getDataTypes()) { + name_map = getPort().getHeader().getNamesToIndexesMap(); } inline size_t BSONEachRowRowInputFormat::columnIndex(const StringRef & name, size_t key_index) diff --git a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp index 44cbf8ca215..a8881c5f398 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp @@ -43,12 +43,13 @@ JSONEachRowRowInputFormat::JSONEachRowRowInputFormat( , prev_positions(header_.columns()) , yield_strings(yield_strings_) { - name_map = getPort().getHeader().getNamesToIndexesMap(); + const auto & header = getPort().getHeader(); + name_map = header.getNamesToIndexesMap(); if (format_settings_.import_nested_json) { - for (size_t i = 0; i != header_.columns(); ++i) + for (size_t i = 0; i != header.columns(); ++i) { - const StringRef column_name = header_.getByPosition(i).name; + const StringRef column_name = header.getByPosition(i).name; const auto split = Nested::splitName(column_name.toView()); if (!split.second.empty()) { diff --git a/tests/queries/0_stateless/02535_json_bson_each_row_curl.reference b/tests/queries/0_stateless/02535_json_bson_each_row_curl.reference new file mode 100644 index 00000000000..fa24965092a --- /dev/null +++ b/tests/queries/0_stateless/02535_json_bson_each_row_curl.reference @@ -0,0 +1,19 @@ +Row 1: +────── +x.a: 1 +x.b: 2 + +Row 2: +────── +x.a: 3 +x.b: 4 + +Row 3: +────── +x.a: 5 +x.b: 6 + +Row 4: +────── +x.a: 7 +x.b: 8 diff --git a/tests/queries/0_stateless/02535_json_bson_each_row_curl.sh b/tests/queries/0_stateless/02535_json_bson_each_row_curl.sh new file mode 100755 index 00000000000..bff80cc7224 --- /dev/null +++ b/tests/queries/0_stateless/02535_json_bson_each_row_curl.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +# Tags: long + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q $'create table test (`x.a` UInt32, `x.b` UInt32) engine=Memory' + +echo '{"x" : {"a" : 1, "b" : 2}}' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+JSONEachRow&input_format_import_nested_json=1&max_threads=10&input_format_parallel_parsing=0" + +echo '{"x" : {"a" : 3, "b" : 4}}' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+JSONEachRow&input_format_import_nested_json=1&max_threads=10&input_format_parallel_parsing=1" + +$CLICKHOUSE_CLIENT -q $'select 5 as `x.a`, 6 as `x.b` format BSONEachRow' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+BSONEachRow&max_threads=10&input_format_parallel_parsing=0" + + +$CLICKHOUSE_CLIENT -q $'select 7 as `x.a`, 8 as `x.b` format BSONEachRow' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+BSONEachRow&max_threads=10&input_format_parallel_parsing=1" + +$CLICKHOUSE_CLIENT -q "select * from test order by 1 format Vertical"; +$CLICKHOUSE_CLIENT -q "drop table test"; + diff --git a/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.reference b/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.reference new file mode 100644 index 00000000000..533a3eb56b6 --- /dev/null +++ b/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.reference @@ -0,0 +1,9 @@ +Row 1: +────── +x.a: 1 +x.b: 2 + +Row 2: +────── +x.a: 3 +x.b: 4 diff --git a/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.sh b/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.sh new file mode 100755 index 00000000000..016511b3f46 --- /dev/null +++ b/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +# Tags: long + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q $'create table test (`x.a` UInt32, `x.b` UInt32) engine=Memory' + +echo '{"x" : {"a" : 1, "b" : 2}}' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+JSONEachRow&input_format_import_nested_json=1&max_threads=10&input_format_parallel_parsing=0" + +echo '{"x" : {"a" : 3, "b" : 4}}' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+JSONEachRow&input_format_import_nested_json=1&max_threads=10&input_format_parallel_parsing=1" + +$CLICKHOUSE_CLIENT -q "select * from test format Vertical"; +$CLICKHOUSE_CLIENT -q "drop table test"; + From bc9c595003ae41835542fb698ca4cfe8b78758cf Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 18 Jan 2023 22:50:59 +0000 Subject: [PATCH 12/13] Update tests --- ...35_json_each_row_import_nested_curl.reference | 9 --------- .../02535_json_each_row_import_nested_curl.sh | 16 ---------------- 2 files changed, 25 deletions(-) delete mode 100644 tests/queries/0_stateless/02535_json_each_row_import_nested_curl.reference delete mode 100755 tests/queries/0_stateless/02535_json_each_row_import_nested_curl.sh diff --git a/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.reference b/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.reference deleted file mode 100644 index 533a3eb56b6..00000000000 --- a/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.reference +++ /dev/null @@ -1,9 +0,0 @@ -Row 1: -────── -x.a: 1 -x.b: 2 - -Row 2: -────── -x.a: 3 -x.b: 4 diff --git a/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.sh b/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.sh deleted file mode 100755 index 016511b3f46..00000000000 --- a/tests/queries/0_stateless/02535_json_each_row_import_nested_curl.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env bash -# Tags: long - -CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -# shellcheck source=../shell_config.sh -. "$CUR_DIR"/../shell_config.sh - -$CLICKHOUSE_CLIENT -q $'create table test (`x.a` UInt32, `x.b` UInt32) engine=Memory' - -echo '{"x" : {"a" : 1, "b" : 2}}' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+JSONEachRow&input_format_import_nested_json=1&max_threads=10&input_format_parallel_parsing=0" - -echo '{"x" : {"a" : 3, "b" : 4}}' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+JSONEachRow&input_format_import_nested_json=1&max_threads=10&input_format_parallel_parsing=1" - -$CLICKHOUSE_CLIENT -q "select * from test format Vertical"; -$CLICKHOUSE_CLIENT -q "drop table test"; - From 77d2236f9dba59b68681f5e6f68e55ddc522aa8c Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 18 Jan 2023 22:51:57 +0000 Subject: [PATCH 13/13] Remove unneded tag --- tests/queries/0_stateless/02535_json_bson_each_row_curl.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/queries/0_stateless/02535_json_bson_each_row_curl.sh b/tests/queries/0_stateless/02535_json_bson_each_row_curl.sh index bff80cc7224..be00aa10a6b 100755 --- a/tests/queries/0_stateless/02535_json_bson_each_row_curl.sh +++ b/tests/queries/0_stateless/02535_json_bson_each_row_curl.sh @@ -1,5 +1,4 @@ #!/usr/bin/env bash -# Tags: long CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh