Merge branch 'master' into 43891_Disallow_concurrent_backups_and_restores

This commit is contained in:
SmitaRKulkarni 2023-01-19 11:21:37 +01:00 committed by GitHub
commit 67e2bf31f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 292 additions and 17 deletions

View File

@ -100,6 +100,7 @@
M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \
M(FilesystemCacheSize, "Filesystem cache size in bytes") \
M(FilesystemCacheElements, "Filesystem cache elements (file segments)") \
M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \
M(S3Requests, "S3 requests") \
M(KeeperAliveConnections, "Number of alive connections") \
M(KeeperOutstandingRequets, "Number of outstanding requests") \

View File

@ -10,6 +10,7 @@
M(InsertQuery, "Same as Query, but only for INSERT queries.") \
M(AsyncInsertQuery, "Same as InsertQuery, but only for asynchronous INSERT queries.") \
M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.") \
M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.") \
M(FailedQuery, "Number of failed queries.") \
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.") \
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.") \

View File

@ -53,10 +53,10 @@ BSONEachRowRowInputFormat::BSONEachRowRowInputFormat(
ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, std::move(params_))
, format_settings(format_settings_)
, name_map(header_.getNamesToIndexesMap())
, prev_positions(header_.columns())
, types(header_.getDataTypes())
{
name_map = getPort().getHeader().getNamesToIndexesMap();
}
inline size_t BSONEachRowRowInputFormat::columnIndex(const StringRef & name, size_t key_index)

View File

@ -43,12 +43,13 @@ JSONEachRowRowInputFormat::JSONEachRowRowInputFormat(
, prev_positions(header_.columns())
, yield_strings(yield_strings_)
{
name_map = getPort().getHeader().getNamesToIndexesMap();
const auto & header = getPort().getHeader();
name_map = header.getNamesToIndexesMap();
if (format_settings_.import_nested_json)
{
for (size_t i = 0; i != header_.columns(); ++i)
for (size_t i = 0; i != header.columns(); ++i)
{
const StringRef column_name = header_.getByPosition(i).name;
const StringRef column_name = header.getByPosition(i).name;
const auto split = Nested::splitName(column_name.toView());
if (!split.second.empty())
{

View File

@ -0,0 +1,130 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <unordered_set>
namespace ProfileEvents
{
extern const Event AsyncInsertCacheHits;
}
namespace CurrentMetrics
{
extern const Metric AsyncInsertCacheSize;
}
namespace DB
{
struct AsyncBlockIDsCache::Cache : public std::unordered_set<String>
{
CurrentMetrics::Increment cache_size_increment;
explicit Cache(std::unordered_set<String> && set_)
: std::unordered_set<String>(std::move(set_))
, cache_size_increment(CurrentMetrics::AsyncInsertCacheSize, size())
{}
};
std::vector<String> AsyncBlockIDsCache::getChildren()
{
auto zookeeper = storage.getZooKeeper();
auto watch_callback = [&](const Coordination::WatchResponse &)
{
auto now = std::chrono::steady_clock::now();
auto last_time = last_updatetime.load();
if (now - last_time < update_min_interval)
{
std::chrono::milliseconds sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(update_min_interval - (now - last_time));
task->scheduleAfter(sleep_time.count());
}
else
task->schedule();
};
std::vector<String> children;
Coordination::Stat stat;
zookeeper->tryGetChildrenWatch(path, children, &stat, watch_callback);
return children;
}
void AsyncBlockIDsCache::update()
try
{
std::vector<String> paths = getChildren();
std::unordered_set<String> set;
for (String & p : paths)
{
set.insert(std::move(p));
}
{
std::lock_guard lock(mu);
cache_ptr = std::make_shared<Cache>(std::move(set));
++version;
}
cv.notify_all();
last_updatetime = std::chrono::steady_clock::now();
}
catch (...)
{
LOG_INFO(log, "Updating async block ids cache failed. Reason: {}", getCurrentExceptionMessage(false));
task->scheduleAfter(update_min_interval.count());
}
AsyncBlockIDsCache::AsyncBlockIDsCache(StorageReplicatedMergeTree & storage_)
: storage(storage_),
update_min_interval(storage.getSettings()->async_block_ids_cache_min_update_interval_ms),
path(storage.zookeeper_path + "/async_blocks"),
log_name(storage.getStorageID().getFullTableName() + " (AsyncBlockIDsCache)"),
log(&Poco::Logger::get(log_name))
{
task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ update(); });
}
void AsyncBlockIDsCache::start()
{
if (storage.getSettings()->use_async_block_ids_cache)
task->activateAndSchedule();
}
/// Caller will keep the version of last call. When the caller calls again, it will wait util gets a newer version.
Strings AsyncBlockIDsCache::detectConflicts(const Strings & paths, UInt64 & last_version)
{
if (!storage.getSettings()->use_async_block_ids_cache)
return {};
std::unique_lock lk(mu);
/// For first time access of this cache, the `last_version` is zero, so it will not block here.
/// For retrying request, We compare the request version and cache version, because zk only returns
/// incomplete information of duplication, we need to update the cache to find out more duplication.
/// The timeout here is to prevent deadlock, just in case.
cv.wait_for(lk, update_min_interval * 2, [&]{return version != last_version;});
if (version == last_version)
LOG_INFO(log, "Read cache with a old version {}", last_version);
CachePtr cur_cache;
cur_cache = cache_ptr;
last_version = version;
lk.unlock();
if (cur_cache == nullptr)
return {};
Strings conflicts;
for (const String & p : paths)
{
if (cur_cache->contains(p))
{
conflicts.push_back(p);
}
}
ProfileEvents::increment(ProfileEvents::AsyncInsertCacheHits, !conflicts.empty());
return conflicts;
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/logger_useful.h>
#include <Core/BackgroundSchedulePool.h>
#include <chrono>
namespace DB
{
class StorageReplicatedMergeTree;
class AsyncBlockIDsCache
{
struct Cache;
using CachePtr = std::shared_ptr<Cache>;
std::vector<String> getChildren();
void update();
public:
explicit AsyncBlockIDsCache(StorageReplicatedMergeTree & storage_);
void start();
void stop() { task->deactivate(); }
Strings detectConflicts(const Strings & paths, UInt64 & last_version);
private:
StorageReplicatedMergeTree & storage;
std::atomic<std::chrono::steady_clock::time_point> last_updatetime;
const std::chrono::milliseconds update_min_interval;
std::mutex mu;
CachePtr cache_ptr;
std::condition_variable cv;
UInt64 version = 0;
const String path;
BackgroundSchedulePool::TaskHolder task;
const String log_name;
Poco::Logger * log;
};
using AsyncBlockIDsCachePtr = std::shared_ptr<AsyncBlockIDsCache>;
}

View File

@ -85,6 +85,8 @@ struct Settings;
M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \
M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "minimum interval between updates of async_block_ids_cache", 0) \
M(Bool, use_async_block_ids_cache, false, "use in-memory cache to filter duplicated async inserts based on block ids", 0) \
M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \

View File

@ -144,6 +144,7 @@ bool ReplicatedMergeTreeRestartingThread::runImpl()
storage.mutations_finalizing_task->activateAndSchedule();
storage.merge_selecting_task->activateAndSchedule();
storage.cleanup_thread.start();
storage.async_block_ids_cache.start();
storage.part_check_thread.start();
LOG_DEBUG(log, "Table started successfully");

View File

@ -5,6 +5,7 @@
#include <Common/SipHash.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ThreadFuzzer.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <DataTypes/ObjectUtils.h>
#include <Core/Block.h>
#include <IO/Operators.h>
@ -105,7 +106,7 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
String conflict_block_id = p.filename();
auto it = block_id_to_offset_idx.find(conflict_block_id);
if (it == block_id_to_offset_idx.end())
throw Exception("Unknown conflict path " + conflict_block_id, ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown conflict path {}", conflict_block_id);
/// if this filter is for self_dedup, that means the block paths is selected by `filterSelfDuplicate`, which is a self purge.
/// in this case, we don't know if zk has this insert, then we should keep one insert, to avoid missing this insert.
offset_idx.insert(std::end(offset_idx), std::begin(it->second) + self_dedup, std::end(it->second));
@ -544,6 +545,8 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
partition.temp_part = storage.writer.writeTempPart(partition.block_with_partition, metadata_snapshot, context);
}
/// reset the cache version to zero for every partition write.
cache_version = 0;
while (true)
{
partition.temp_part.finalize();
@ -676,6 +679,13 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
BlockIDsType block_id_path ;
if constexpr (async_insert)
{
/// prefilter by cache
conflict_block_ids = storage.async_block_ids_cache.detectConflicts(block_id, cache_version);
if (!conflict_block_ids.empty())
{
cache_version = 0;
return;
}
for (const auto & single_block_id : block_id)
block_id_path.push_back(storage.zookeeper_path + "/async_blocks/" + single_block_id);
}

View File

@ -5,6 +5,7 @@
#include <base/types.h>
#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
namespace Poco { class Logger; }
@ -115,6 +116,8 @@ private:
size_t quorum_timeout_ms;
size_t max_parts_per_block;
UInt64 cache_version = 0;
bool is_attach = false;
bool quorum_parallel = false;
const bool deduplicate = true;

View File

@ -91,6 +91,7 @@
#include <base/scope_guard.h>
#include <Common/scope_guard_safe.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/replace.hpp>
@ -283,6 +284,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, queue(*this, merge_strategy_picker)
, fetcher(*this)
, cleanup_thread(*this)
, async_block_ids_cache(*this)
, part_check_thread(*this)
, restarting_thread(*this)
, part_moves_between_shards_orchestrator(*this)
@ -4404,6 +4406,7 @@ void StorageReplicatedMergeTree::partialShutdown()
mutations_finalizing_task->deactivate();
cleanup_thread.stop();
async_block_ids_cache.stop();
part_check_thread.stop();
/// Stop queue processing

View File

@ -4,6 +4,7 @@
#include <atomic>
#include <pcg_random.hpp>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <Storages/IStorageCluster.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
@ -335,6 +336,7 @@ private:
friend class ReplicatedMergeTreeSinkImpl;
friend class ReplicatedMergeTreePartCheckThread;
friend class ReplicatedMergeTreeCleanupThread;
friend class AsyncBlockIDsCache;
friend class ReplicatedMergeTreeAlterThread;
friend class ReplicatedMergeTreeRestartingThread;
friend class ReplicatedMergeTreeAttachThread;
@ -443,6 +445,8 @@ private:
/// A thread that removes old parts, log entries, and blocks.
ReplicatedMergeTreeCleanupThread cleanup_thread;
AsyncBlockIDsCache async_block_ids_cache;
/// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
ReplicatedMergeTreePartCheckThread part_check_thread;

View File

@ -46,13 +46,15 @@ StorageS3Cluster::StorageS3Cluster(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_)
ContextPtr context_,
bool structure_argument_was_provided_)
: IStorageCluster(table_id_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
, filename(configuration_.url)
, cluster_name(configuration_.cluster_name)
, format_name(configuration_.format)
, compression_method(configuration_.compression_method)
, structure_argument_was_provided(structure_argument_was_provided_)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename});
StorageInMemoryMetadata storage_metadata;
@ -68,7 +70,6 @@ StorageS3Cluster::StorageS3Cluster(
auto columns = StorageS3::getTableStructureFromDataImpl(format_name, s3_configuration, compression_method,
/*distributed_processing_*/false, is_key_with_globs, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns);
add_columns_structure_to_query = true;
}
else
storage_metadata.setColumns(columns_);
@ -111,7 +112,7 @@ Pipe StorageS3Cluster::read(
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
ASTPtr query_to_send = interpreter.getQueryInfo().query->clone();
if (add_columns_structure_to_query)
if (!structure_argument_was_provided)
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 5, getName());

View File

@ -26,7 +26,8 @@ public:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_);
ContextPtr context_,
bool structure_argument_was_provided_);
std::string getName() const override { return "S3Cluster"; }
@ -49,7 +50,7 @@ private:
String compression_method;
NamesAndTypesList virtual_columns;
Block virtual_block;
bool add_columns_structure_to_query = false;
bool structure_argument_was_provided;
};

View File

@ -96,8 +96,9 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
{
StoragePtr storage;
ColumnsDescription columns;
bool structure_argument_was_provided = configuration.structure != "auto";
if (configuration.structure != "auto")
if (structure_argument_was_provided)
{
columns = parseColumnsListFromString(configuration.structure, context);
}
@ -126,7 +127,8 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
context);
context,
structure_argument_was_provided);
}
storage->startup();

View File

@ -177,10 +177,7 @@ def get_table_uuid(node, db_atomic, table):
return uuid
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
node_names = ["node1z", "node2z", "node1n", "node2n", "node_another_bucket"]
for node_name in node_names:
@ -257,3 +254,5 @@ def test_restore_another_bucket_path(cluster, db_atomic, zero_copy):
assert node_another_bucket.query(
"SELECT count(*) FROM s3.test FORMAT Values"
) == "({})".format(size * (keys - dropped_keys))
drop_table(cluster)

View File

@ -81,7 +81,7 @@ EventDate DateTime,
KeyID UInt32
) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/t_async_insert_dedup', '{replica}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (KeyID, EventDate)
ORDER BY (KeyID, EventDate) SETTINGS use_async_block_ids_cache = 1
''')
q = queue.Queue(100)
@ -98,7 +98,7 @@ gen.join()
retry = 0
while (True):
while True:
time.sleep(5)
result = client.query("select KeyID from t_async_insert_dedup order by KeyID")
result = result.split()
@ -124,6 +124,16 @@ while (True):
else:
print(len(result), flush=True)
break
result = client.query("SELECT value FROM system.metrics where metric = 'AsyncInsertCacheSize'")
result = int(result.split()[0])
if result <= 0:
raise Exception(f"AsyncInsertCacheSize should > 0, but got {result}")
result = client.query("SELECT value FROM system.events where event = 'AsyncInsertCacheHits'")
result = int(result.split()[0])
if result <= 0:
raise Exception(f"AsyncInsertCacheHits should > 0, but got {result}")
client.query("DROP TABLE IF EXISTS t_async_insert_dedup NO DELAY")
os._exit(os.EX_OK)

View File

@ -0,0 +1,4 @@
1 2 3
4 5 6
7 8 9
0 0 0

View File

@ -0,0 +1,9 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
drop table if exists test;
create table test (x UInt32, y UInt32, z UInt32) engine=Memory();
insert into test select * from s3Cluster('test_cluster_one_shard_three_replicas_localhost', 'http://localhost:11111/test/a.tsv');
select * from test;
drop table test;

View File

@ -0,0 +1,19 @@
Row 1:
──────
x.a: 1
x.b: 2
Row 2:
──────
x.a: 3
x.b: 4
Row 3:
──────
x.a: 5
x.b: 6
Row 4:
──────
x.a: 7
x.b: 8

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q $'create table test (`x.a` UInt32, `x.b` UInt32) engine=Memory'
echo '{"x" : {"a" : 1, "b" : 2}}' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+JSONEachRow&input_format_import_nested_json=1&max_threads=10&input_format_parallel_parsing=0"
echo '{"x" : {"a" : 3, "b" : 4}}' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+JSONEachRow&input_format_import_nested_json=1&max_threads=10&input_format_parallel_parsing=1"
$CLICKHOUSE_CLIENT -q $'select 5 as `x.a`, 6 as `x.b` format BSONEachRow' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+BSONEachRow&max_threads=10&input_format_parallel_parsing=0"
$CLICKHOUSE_CLIENT -q $'select 7 as `x.a`, 8 as `x.b` format BSONEachRow' | ${CLICKHOUSE_CURL} --data-binary @- "${CLICKHOUSE_URL}&query=INSERT+INTO+test+FORMAT+BSONEachRow&max_threads=10&input_format_parallel_parsing=1"
$CLICKHOUSE_CLIENT -q "select * from test order by 1 format Vertical";
$CLICKHOUSE_CLIENT -q "drop table test";