mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
S3Queue rework ordered mode
This commit is contained in:
parent
4ee498a8b5
commit
cfbf1cc1e2
@ -13,6 +13,7 @@
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
|
||||
#include <Poco/JSON/JSON.h>
|
||||
#include <Poco/JSON/Object.h>
|
||||
@ -54,6 +55,15 @@ namespace
|
||||
pcg64 rng(randomSeed());
|
||||
return min + rng() % (max - min + 1);
|
||||
}
|
||||
|
||||
size_t getBucketsNum(const S3QueueSettings & settings)
|
||||
{
|
||||
if (settings.s3queue_buckets)
|
||||
return settings.s3queue_buckets;
|
||||
if (settings.s3queue_processing_threads_num)
|
||||
return settings.s3queue_processing_threads_num;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> S3QueueFilesMetadata::LocalFileStatuses::lock() const
|
||||
@ -133,12 +143,11 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, con
|
||||
, max_loading_retries(settings_.s3queue_loading_retries.value)
|
||||
, min_cleanup_interval_ms(settings_.s3queue_cleanup_interval_min_ms.value)
|
||||
, max_cleanup_interval_ms(settings_.s3queue_cleanup_interval_max_ms.value)
|
||||
, shards_num(settings_.s3queue_total_shards_num)
|
||||
, threads_per_shard(settings_.s3queue_processing_threads_num)
|
||||
, buckets_num(getBucketsNum(settings_))
|
||||
, zookeeper_path(zookeeper_path_)
|
||||
, zookeeper_processing_path(zookeeper_path_ / "processing")
|
||||
, zookeeper_processed_path(zookeeper_path_ / "processed")
|
||||
, zookeeper_failed_path(zookeeper_path_ / "failed")
|
||||
, zookeeper_shards_path(zookeeper_path_ / "shards")
|
||||
, zookeeper_buckets_path(zookeeper_path_ / "buckets")
|
||||
, zookeeper_cleanup_lock_path(zookeeper_path_ / "cleanup_lock")
|
||||
, log(getLogger("StorageS3Queue(" + zookeeper_path_.string() + ")"))
|
||||
{
|
||||
@ -148,6 +157,8 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, con
|
||||
task->activate();
|
||||
task->scheduleAfter(generateRescheduleInterval(min_cleanup_interval_ms, max_cleanup_interval_ms));
|
||||
}
|
||||
|
||||
LOG_TEST(log, "Using {} buckets", buckets_num);
|
||||
}
|
||||
|
||||
S3QueueFilesMetadata::~S3QueueFilesMetadata()
|
||||
@ -173,6 +184,57 @@ S3QueueFilesMetadata::FileStatusPtr S3QueueFilesMetadata::getFileStatus(const st
|
||||
return local_file_statuses.get(path, /* create */false);
|
||||
}
|
||||
|
||||
bool S3QueueFilesMetadata::useBucketsForProcessing() const
|
||||
{
|
||||
return mode == S3QueueMode::ORDERED && (buckets_num > 1);
|
||||
}
|
||||
|
||||
S3QueueFilesMetadata::Bucket S3QueueFilesMetadata::getBucketForPath(const std::string & path) const
|
||||
{
|
||||
return sipHash64(path) % buckets_num;
|
||||
}
|
||||
|
||||
std::string S3QueueFilesMetadata::getProcessorInfo(const std::string & processor_id)
|
||||
{
|
||||
Poco::JSON::Object json;
|
||||
json.set("hostname", DNSResolver::instance().getHostName());
|
||||
json.set("processor_id", processor_id);
|
||||
|
||||
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
oss.exceptions(std::ios::failbit);
|
||||
Poco::JSON::Stringifier::stringify(json, oss);
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
bool S3QueueFilesMetadata::tryAcquireBucket(const Bucket & bucket, const Processor & processor)
|
||||
{
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto bucket_lock_path = getBucketLockPath(bucket);
|
||||
const auto processor_info = getProcessorInfo(processor);
|
||||
|
||||
zk_client->createAncestors(bucket_lock_path);
|
||||
|
||||
auto code = zk_client->tryCreate(bucket_lock_path, processor_info, zkutil::CreateMode::Ephemeral);
|
||||
if (code == Coordination::Error::ZOK)
|
||||
return true;
|
||||
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
return false;
|
||||
|
||||
if (Coordination::isHardwareError(code))
|
||||
return false;
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code));
|
||||
}
|
||||
|
||||
void S3QueueFilesMetadata::releaseBucket(const Bucket & bucket)
|
||||
{
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto bucket_lock_path = getBucketLockPath(bucket);
|
||||
zk_client->remove(bucket_lock_path); /// TODO: Add version
|
||||
LOG_TEST(log, "Released the bucket: {}", bucket);
|
||||
}
|
||||
|
||||
std::string S3QueueFilesMetadata::getNodeName(const std::string & path)
|
||||
{
|
||||
/// Since with are dealing with paths in s3 which can have "/",
|
||||
@ -184,6 +246,38 @@ std::string S3QueueFilesMetadata::getNodeName(const std::string & path)
|
||||
return toString(path_hash.get64());
|
||||
}
|
||||
|
||||
std::string S3QueueFilesMetadata::getProcessingPath(const std::string & path_hash) const
|
||||
{
|
||||
return zookeeper_processing_path / path_hash;
|
||||
}
|
||||
|
||||
std::string S3QueueFilesMetadata::getFailedPath(const std::string & path_hash) const
|
||||
{
|
||||
return zookeeper_failed_path / path_hash;
|
||||
}
|
||||
|
||||
|
||||
std::string S3QueueFilesMetadata::getProcessedPath(const std::string & path, const std::string & path_hash) const
|
||||
{
|
||||
if (mode == S3QueueMode::UNORDERED)
|
||||
{
|
||||
return zookeeper_path / "processed" / path_hash;
|
||||
}
|
||||
else if (useBucketsForProcessing())
|
||||
{
|
||||
return zookeeper_path / "buckets" / toString(getBucketForPath(path)) / "processed";
|
||||
}
|
||||
else
|
||||
{
|
||||
return zookeeper_path / "processed";
|
||||
}
|
||||
}
|
||||
|
||||
fs::path S3QueueFilesMetadata::getBucketLockPath(const Bucket & bucket) const
|
||||
{
|
||||
return zookeeper_path / "buckets" / toString(bucket) / "lock";
|
||||
}
|
||||
|
||||
S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata(
|
||||
const std::string & path,
|
||||
const std::string & exception,
|
||||
@ -204,126 +298,6 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata(
|
||||
return metadata;
|
||||
}
|
||||
|
||||
bool S3QueueFilesMetadata::isShardedProcessing() const
|
||||
{
|
||||
return getProcessingIdsNum() > 1 && mode == S3QueueMode::ORDERED;
|
||||
}
|
||||
|
||||
size_t S3QueueFilesMetadata::registerNewShard()
|
||||
{
|
||||
if (!isShardedProcessing())
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot register a new shard, because processing is not sharded");
|
||||
}
|
||||
|
||||
const auto zk_client = getZooKeeper();
|
||||
zk_client->createIfNotExists(zookeeper_shards_path, "");
|
||||
|
||||
std::string shard_node_path;
|
||||
size_t shard_id = 0;
|
||||
for (size_t i = 0; i < shards_num; ++i)
|
||||
{
|
||||
const auto node_path = getZooKeeperPathForShard(i);
|
||||
auto err = zk_client->tryCreate(node_path, "", zkutil::CreateMode::Persistent);
|
||||
if (err == Coordination::Error::ZOK)
|
||||
{
|
||||
shard_node_path = node_path;
|
||||
shard_id = i;
|
||||
break;
|
||||
}
|
||||
else if (err == Coordination::Error::ZNODEEXISTS)
|
||||
continue;
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Unexpected error: {}", magic_enum::enum_name(err));
|
||||
}
|
||||
|
||||
if (shard_node_path.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to register a new shard");
|
||||
|
||||
LOG_TRACE(log, "Using shard {} (zk node: {})", shard_id, shard_node_path);
|
||||
return shard_id;
|
||||
}
|
||||
|
||||
std::string S3QueueFilesMetadata::getZooKeeperPathForShard(size_t shard_id) const
|
||||
{
|
||||
return zookeeper_shards_path / ("shard" + toString(shard_id));
|
||||
}
|
||||
|
||||
void S3QueueFilesMetadata::registerNewShard(size_t shard_id)
|
||||
{
|
||||
if (!isShardedProcessing())
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot register a new shard, because processing is not sharded");
|
||||
}
|
||||
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto node_path = getZooKeeperPathForShard(shard_id);
|
||||
zk_client->createAncestors(node_path);
|
||||
|
||||
auto err = zk_client->tryCreate(node_path, "", zkutil::CreateMode::Persistent);
|
||||
if (err != Coordination::Error::ZOK)
|
||||
{
|
||||
if (err == Coordination::Error::ZNODEEXISTS)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot register shard {}: already exists", shard_id);
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Unexpected error: {}", magic_enum::enum_name(err));
|
||||
}
|
||||
}
|
||||
|
||||
bool S3QueueFilesMetadata::isShardRegistered(size_t shard_id)
|
||||
{
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto node_path = getZooKeeperPathForShard(shard_id);
|
||||
return zk_client->exists(node_path);
|
||||
}
|
||||
|
||||
void S3QueueFilesMetadata::unregisterShard(size_t shard_id)
|
||||
{
|
||||
if (!isShardedProcessing())
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot unregister a shard, because processing is not sharded");
|
||||
}
|
||||
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto node_path = getZooKeeperPathForShard(shard_id);
|
||||
auto error_code = zk_client->tryRemove(node_path);
|
||||
if (error_code != Coordination::Error::ZOK
|
||||
&& error_code != Coordination::Error::ZNONODE)
|
||||
throw zkutil::KeeperException::fromPath(error_code, node_path);
|
||||
}
|
||||
|
||||
size_t S3QueueFilesMetadata::getProcessingIdsNum() const
|
||||
{
|
||||
return shards_num * threads_per_shard;
|
||||
}
|
||||
|
||||
std::vector<size_t> S3QueueFilesMetadata::getProcessingIdsForShard(size_t shard_id) const
|
||||
{
|
||||
std::vector<size_t> res(threads_per_shard);
|
||||
std::iota(res.begin(), res.end(), shard_id * threads_per_shard);
|
||||
return res;
|
||||
}
|
||||
|
||||
bool S3QueueFilesMetadata::isProcessingIdBelongsToShard(size_t id, size_t shard_id) const
|
||||
{
|
||||
return shard_id * threads_per_shard <= id && id < (shard_id + 1) * threads_per_shard;
|
||||
}
|
||||
|
||||
size_t S3QueueFilesMetadata::getIdForProcessingThread(size_t thread_id, size_t shard_id) const
|
||||
{
|
||||
return shard_id * threads_per_shard + thread_id;
|
||||
}
|
||||
|
||||
size_t S3QueueFilesMetadata::getProcessingIdForPath(const std::string & path) const
|
||||
{
|
||||
return sipHash64(path) % getProcessingIdsNum();
|
||||
}
|
||||
|
||||
S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
|
||||
{
|
||||
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds);
|
||||
@ -413,6 +387,8 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs
|
||||
{
|
||||
case SetFileProcessingResult::Success:
|
||||
{
|
||||
LOG_TEST(log, "Path {} successfully acquired for processing", path);
|
||||
|
||||
std::lock_guard lock(file_status->metadata_lock);
|
||||
file_status->state = FileStatus::State::Processing;
|
||||
|
||||
@ -426,18 +402,23 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs
|
||||
}
|
||||
case SetFileProcessingResult::AlreadyProcessed:
|
||||
{
|
||||
LOG_TEST(log, "Path {} is already processed", path);
|
||||
|
||||
std::lock_guard lock(file_status->metadata_lock);
|
||||
file_status->state = FileStatus::State::Processed;
|
||||
return {};
|
||||
}
|
||||
case SetFileProcessingResult::AlreadyFailed:
|
||||
{
|
||||
LOG_TEST(log, "Path {} is already failed and not retriable", path);
|
||||
|
||||
std::lock_guard lock(file_status->metadata_lock);
|
||||
file_status->state = FileStatus::State::Failed;
|
||||
return {};
|
||||
}
|
||||
case SetFileProcessingResult::ProcessingByOtherNode:
|
||||
{
|
||||
LOG_TEST(log, "Path {} is being processing already", path);
|
||||
/// We cannot save any local state here, see comment above.
|
||||
return {};
|
||||
}
|
||||
@ -448,138 +429,135 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
|
||||
S3QueueFilesMetadata::ProcessingNodeHolderPtr>
|
||||
S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status)
|
||||
{
|
||||
/// In one zookeeper transaction do the following:
|
||||
/// 1. check that corresponding persistent nodes do not exist in processed/ and failed/;
|
||||
/// 2. create an ephemenral node in /processing if it does not exist;
|
||||
/// Return corresponding status if any of the step failed.
|
||||
|
||||
const auto node_name = getNodeName(path);
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto processed_node_path = getProcessedPath(path, node_name);
|
||||
const auto processing_node_path = getProcessingPath(node_name);
|
||||
const auto failed_node_path = getFailedPath(node_name);
|
||||
|
||||
/// In one zookeeper transaction do the following:
|
||||
enum RequestType
|
||||
{
|
||||
/// node_name is not within processed persistent nodes
|
||||
PROCESSED_PATH_DOESNT_EXIST = 0,
|
||||
/// node_name is not within failed persistent nodes
|
||||
FAILED_PATH_DOESNT_EXIST = 2,
|
||||
/// node_name ephemeral processing node was successfully created
|
||||
CREATED_PROCESSING_PATH = 4,
|
||||
};
|
||||
|
||||
auto node_metadata = createNodeMetadata(path);
|
||||
node_metadata.processing_id = getRandomASCIIString(10);
|
||||
|
||||
Coordination::Requests requests;
|
||||
|
||||
requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path / node_name, "", zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeRemoveRequest(zookeeper_processed_path / node_name, -1));
|
||||
|
||||
requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, "", zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name, -1));
|
||||
|
||||
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral));
|
||||
|
||||
Coordination::Responses responses;
|
||||
auto code = zk_client->tryMulti(requests, responses);
|
||||
auto is_request_failed = [&](RequestType type) { return responses[type]->error != Coordination::Error::ZOK; };
|
||||
|
||||
requests.push_back(zkutil::makeCreateRequest(processed_node_path, "", zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeRemoveRequest(processed_node_path, -1));
|
||||
|
||||
requests.push_back(zkutil::makeCreateRequest(failed_node_path, "", zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeRemoveRequest(failed_node_path, -1));
|
||||
|
||||
requests.push_back(zkutil::makeCreateRequest(processing_node_path, node_metadata.toString(), zkutil::CreateMode::Ephemeral));
|
||||
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto code = zk_client->tryMulti(requests, responses);
|
||||
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
auto holder = std::make_unique<ProcessingNodeHolder>(
|
||||
node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client, log);
|
||||
node_metadata.processing_id, path, processing_node_path, file_status, zk_client, log);
|
||||
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
|
||||
}
|
||||
|
||||
if (responses[0]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
if (is_request_failed(PROCESSED_PATH_DOESNT_EXIST))
|
||||
return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr};
|
||||
}
|
||||
else if (responses[2]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
|
||||
if (is_request_failed(FAILED_PATH_DOESNT_EXIST))
|
||||
return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr};
|
||||
}
|
||||
else if (responses[4]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
|
||||
if (is_request_failed(CREATED_PROCESSING_PATH))
|
||||
return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr};
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", magic_enum::enum_name(code));
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", magic_enum::enum_name(code));
|
||||
}
|
||||
|
||||
std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
|
||||
S3QueueFilesMetadata::ProcessingNodeHolderPtr>
|
||||
S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status)
|
||||
{
|
||||
/// Same as for Unordered mode.
|
||||
/// The only difference is the check if the file is already processed.
|
||||
/// For Ordered mode we do not keep a separate /processed/hash_node for each file
|
||||
/// but instead we only keep a maximum processed file
|
||||
/// (since all files are ordered and new files have a lexically bigger name, it makes sense).
|
||||
|
||||
const auto node_name = getNodeName(path);
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto processed_node_path = getProcessedPath(path, node_name);
|
||||
const auto processing_node_path = getProcessingPath(node_name);
|
||||
const auto failed_node_path = getFailedPath(node_name);
|
||||
|
||||
/// In one zookeeper transaction do the following:
|
||||
enum RequestType
|
||||
{
|
||||
/// node_name is not within failed persistent nodes
|
||||
FAILED_PATH_DOESNT_EXIST = 0,
|
||||
/// node_name ephemeral processing node was successfully created
|
||||
CREATED_PROCESSING_PATH = 2,
|
||||
/// max_processed_node version did not change
|
||||
CHECKED_MAX_PROCESSED_PATH = 3,
|
||||
};
|
||||
|
||||
auto node_metadata = createNodeMetadata(path);
|
||||
node_metadata.processing_id = getRandomASCIIString(10);
|
||||
const auto zk_client = getZooKeeper();
|
||||
|
||||
while (true)
|
||||
{
|
||||
/// Get a /processed node content - max_processed path.
|
||||
/// Compare our path to it.
|
||||
/// If file is not yet processed, check corresponding /failed node and try create /processing node
|
||||
/// and in the same zookeeper transaction also check that /processed node did not change
|
||||
/// in between, e.g. that stat.version remained the same.
|
||||
/// If the version did change - retry (since we cannot do Get and Create requests
|
||||
/// in the same zookeeper transaction, so we use a while loop with tries).
|
||||
|
||||
auto processed_node = isShardedProcessing()
|
||||
? zookeeper_processed_path / toString(getProcessingIdForPath(path))
|
||||
: zookeeper_processed_path;
|
||||
|
||||
NodeMetadata processed_node_metadata;
|
||||
Coordination::Stat processed_node_stat;
|
||||
std::optional<int> max_processed_node_version;
|
||||
std::string data;
|
||||
auto processed_node_exists = zk_client->tryGet(processed_node, data, &processed_node_stat);
|
||||
if (processed_node_exists && !data.empty())
|
||||
processed_node_metadata = NodeMetadata::fromString(data);
|
||||
|
||||
auto max_processed_file_path = processed_node_metadata.file_path;
|
||||
if (!max_processed_file_path.empty() && path <= max_processed_file_path)
|
||||
Coordination::Stat processed_node_stat;
|
||||
if (zk_client->tryGet(processed_node_path, data, &processed_node_stat) && !data.empty())
|
||||
{
|
||||
LOG_TEST(log, "File {} is already processed, max processed file: {}", path, max_processed_file_path);
|
||||
return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr};
|
||||
auto processed_node_metadata = NodeMetadata::fromString(data);
|
||||
LOG_TEST(log, "Current max processed file {} from path: {}", processed_node_metadata.file_path, processed_node_path);
|
||||
|
||||
if (!processed_node_metadata.file_path.empty() && path <= processed_node_metadata.file_path)
|
||||
{
|
||||
LOG_TEST(log, "File {} is already processed, max processed file: {}",
|
||||
path, processed_node_metadata.file_path);
|
||||
return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr};
|
||||
}
|
||||
max_processed_node_version = processed_node_stat.version;
|
||||
}
|
||||
|
||||
Coordination::Requests requests;
|
||||
requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, "", zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name, -1));
|
||||
Coordination::Responses responses;
|
||||
auto is_request_failed = [&](RequestType type) { return responses[type]->error != Coordination::Error::ZOK; };
|
||||
|
||||
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral));
|
||||
requests.push_back(zkutil::makeCreateRequest(failed_node_path, "", zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeRemoveRequest(failed_node_path, -1));
|
||||
|
||||
if (processed_node_exists)
|
||||
requests.push_back(zkutil::makeCreateRequest(processing_node_path, node_metadata.toString(), zkutil::CreateMode::Ephemeral));
|
||||
|
||||
if (max_processed_node_version.has_value())
|
||||
{
|
||||
requests.push_back(zkutil::makeCheckRequest(processed_node, processed_node_stat.version));
|
||||
requests.push_back(zkutil::makeCheckRequest(processed_node_path, max_processed_node_version.value()));
|
||||
}
|
||||
else
|
||||
{
|
||||
requests.push_back(zkutil::makeCreateRequest(processed_node, "", zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeRemoveRequest(processed_node, -1));
|
||||
requests.push_back(zkutil::makeCreateRequest(processed_node_path, "", zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeRemoveRequest(processed_node_path, -1));
|
||||
}
|
||||
|
||||
Coordination::Responses responses;
|
||||
auto code = zk_client->tryMulti(requests, responses);
|
||||
const auto code = zk_client->tryMulti(requests, responses);
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
auto holder = std::make_unique<ProcessingNodeHolder>(
|
||||
node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client, log);
|
||||
|
||||
LOG_TEST(log, "File {} is ready to be processed", path);
|
||||
auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, processing_node_path, file_status, zk_client, log);
|
||||
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
|
||||
}
|
||||
|
||||
if (responses[0]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_TEST(log, "Skipping file `{}`: failed", path);
|
||||
if (is_request_failed(FAILED_PATH_DOESNT_EXIST))
|
||||
return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr};
|
||||
}
|
||||
else if (responses[2]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_TEST(log, "Skipping file `{}`: already processing", path);
|
||||
|
||||
if (is_request_failed(CREATED_PROCESSING_PATH))
|
||||
return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr};
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TEST(log, "Version of max processed file changed. Retrying the check for file `{}`", path);
|
||||
}
|
||||
|
||||
LOG_TEST(log, "Version of max processed file changed. Retrying the check for file `{}`", path);
|
||||
}
|
||||
}
|
||||
|
||||
@ -615,6 +593,14 @@ void S3QueueFilesMetadata::setFileProcessed(ProcessingNodeHolderPtr holder)
|
||||
ProfileEvents::increment(ProfileEvents::S3QueueProcessedFiles);
|
||||
}
|
||||
|
||||
void S3QueueFilesMetadata::setFileProcessed(const std::string & path)
|
||||
{
|
||||
if (mode != S3QueueMode::ORDERED)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can set file as preprocessed only for Ordered mode");
|
||||
|
||||
setFileProcessedForOrderedModeImpl(path, nullptr);
|
||||
}
|
||||
|
||||
void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder)
|
||||
{
|
||||
/// Create a persistent node in /processed and remove ephemeral node from /processing.
|
||||
@ -623,14 +609,15 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder
|
||||
const auto node_name = getNodeName(path);
|
||||
const auto node_metadata = createNodeMetadata(path).toString();
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto processed_node_path = getProcessedPath(path, node_name);
|
||||
|
||||
Coordination::Requests requests;
|
||||
requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path / node_name, node_metadata, zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeCreateRequest(processed_node_path, node_metadata, zkutil::CreateMode::Persistent));
|
||||
|
||||
Coordination::Responses responses;
|
||||
if (holder->remove(&requests, &responses))
|
||||
{
|
||||
LOG_TRACE(log, "Moved file `{}` to processed", path);
|
||||
LOG_TRACE(log, "Moved file `{}` to processed (node path: {})", path, processed_node_path);
|
||||
if (max_loading_retries)
|
||||
zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1);
|
||||
return;
|
||||
@ -643,29 +630,24 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder
|
||||
}
|
||||
|
||||
LOG_WARNING(log,
|
||||
"Cannot set file ({}) as processed since ephemeral node in /processing"
|
||||
"Cannot set file ({}) as processed since ephemeral node in /processing (code: {})"
|
||||
"does not exist with expected id, "
|
||||
"this could be a result of expired zookeeper session", path);
|
||||
"this could be a result of expired zookeeper session", path, responses[1]->error);
|
||||
}
|
||||
|
||||
|
||||
void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder)
|
||||
{
|
||||
auto processed_node_path = isShardedProcessing()
|
||||
? zookeeper_processed_path / toString(getProcessingIdForPath(holder->path))
|
||||
: zookeeper_processed_path;
|
||||
|
||||
setFileProcessedForOrderedModeImpl(holder->path, holder, processed_node_path);
|
||||
setFileProcessedForOrderedModeImpl(holder->path, holder);
|
||||
}
|
||||
|
||||
void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
|
||||
const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path)
|
||||
void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(const std::string & path, ProcessingNodeHolderPtr holder)
|
||||
{
|
||||
/// Update a persistent node in /processed and remove ephemeral node from /processing.
|
||||
|
||||
const auto node_name = getNodeName(path);
|
||||
const auto node_metadata = createNodeMetadata(path).toString();
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto processed_node_path = getProcessedPath(path, node_name);
|
||||
|
||||
LOG_TRACE(log, "Setting file `{}` as processed (at {})", path, processed_node_path);
|
||||
while (true)
|
||||
@ -695,6 +677,13 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
|
||||
Coordination::Responses responses;
|
||||
if (holder)
|
||||
{
|
||||
// if (useBucketsForProcessing())
|
||||
// {
|
||||
// auto bucket_lock_path = getBucketLockPath(getBucketForPath(path));
|
||||
// /// TODO: add version
|
||||
// requests.push_back(zkutil::makeCheckRequest(bucket_lock_path, -1));
|
||||
// }
|
||||
|
||||
if (holder->remove(&requests, &responses))
|
||||
{
|
||||
LOG_TRACE(log, "Moved file `{}` to processed", path);
|
||||
@ -728,22 +717,6 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
|
||||
}
|
||||
}
|
||||
|
||||
void S3QueueFilesMetadata::setFileProcessed(const std::string & path, size_t shard_id)
|
||||
{
|
||||
if (mode != S3QueueMode::ORDERED)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can set file as preprocessed only for Ordered mode");
|
||||
|
||||
if (isShardedProcessing())
|
||||
{
|
||||
for (const auto & processor : getProcessingIdsForShard(shard_id))
|
||||
setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path / toString(processor));
|
||||
}
|
||||
else
|
||||
{
|
||||
setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path);
|
||||
}
|
||||
}
|
||||
|
||||
void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const String & exception_message)
|
||||
{
|
||||
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds);
|
||||
@ -767,6 +740,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
|
||||
const auto node_name = getNodeName(path);
|
||||
auto node_metadata = createNodeMetadata(path, exception_message);
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto processing_node_path = getProcessingPath(node_name);
|
||||
|
||||
/// Is file retriable?
|
||||
if (max_loading_retries == 0)
|
||||
@ -830,7 +804,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
|
||||
/// Make a persistent node /failed/node_hash, remove /failed/node_hash.retriable node and node in /processing.
|
||||
|
||||
Coordination::Requests requests;
|
||||
requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1));
|
||||
requests.push_back(zkutil::makeRemoveRequest(processing_node_path, -1));
|
||||
requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name_with_retriable_suffix,
|
||||
stat.version));
|
||||
requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name,
|
||||
@ -849,7 +823,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
|
||||
/// File is still retriable, update retries count and remove node from /processing.
|
||||
|
||||
Coordination::Requests requests;
|
||||
requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1));
|
||||
requests.push_back(zkutil::makeRemoveRequest(processing_node_path, -1));
|
||||
if (node_metadata.retries == 0)
|
||||
{
|
||||
requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name_with_retriable_suffix,
|
||||
@ -980,6 +954,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
|
||||
{
|
||||
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueCleanupMaxSetSizeOrTTLMicroseconds);
|
||||
const auto zk_client = getZooKeeper();
|
||||
const std::string zookeeper_processed_path = zookeeper_path / "processed";
|
||||
|
||||
Strings processed_nodes;
|
||||
auto code = zk_client->tryGetChildren(zookeeper_processed_path, processed_nodes);
|
||||
@ -987,7 +962,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
|
||||
{
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
{
|
||||
LOG_TEST(log, "Path {} does not exist", zookeeper_processed_path.string());
|
||||
LOG_TEST(log, "Path {} does not exist", zookeeper_processed_path);
|
||||
}
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code));
|
||||
@ -1051,7 +1026,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
|
||||
|
||||
for (const auto & node : processed_nodes)
|
||||
{
|
||||
const std::string path = zookeeper_processed_path / node;
|
||||
const std::string path = getProcessedPath("", node); /// TODO:
|
||||
try
|
||||
{
|
||||
std::string metadata_str;
|
||||
|
@ -18,8 +18,14 @@ class StorageS3Queue;
|
||||
/**
|
||||
* A class for managing S3Queue metadata in zookeeper, e.g.
|
||||
* the following folders:
|
||||
* - <path_to_metadata>/processing
|
||||
* - <path_to_metadata>/processed
|
||||
* - <path_to_metadata>/processing
|
||||
* - <path_to_metadata>/failed
|
||||
*
|
||||
* In case we use buckets for processing for Ordered mode, the structure looks like:
|
||||
* - <path_to_metadata>/buckets/<bucket>/processed -- persistent node, information about last processed file.
|
||||
* - <path_to_metadata>/buckets/<bucket>/lock -- ephemeral node, used for acquiring bucket lock.
|
||||
* - <path_to_metadata>/processing
|
||||
* - <path_to_metadata>/failed
|
||||
*
|
||||
* Depending on S3Queue processing mode (ordered or unordered)
|
||||
@ -37,12 +43,15 @@ public:
|
||||
class ProcessingNodeHolder;
|
||||
using ProcessingNodeHolderPtr = std::shared_ptr<ProcessingNodeHolder>;
|
||||
|
||||
using Bucket = size_t;
|
||||
using Processor = std::string;
|
||||
|
||||
S3QueueFilesMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_);
|
||||
|
||||
~S3QueueFilesMetadata();
|
||||
|
||||
void setFileProcessed(ProcessingNodeHolderPtr holder);
|
||||
void setFileProcessed(const std::string & path, size_t shard_id);
|
||||
void setFileProcessed(const std::string & path);
|
||||
|
||||
void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message);
|
||||
|
||||
@ -81,37 +90,13 @@ public:
|
||||
|
||||
void deactivateCleanupTask();
|
||||
|
||||
/// Should the table use sharded processing?
|
||||
/// We use sharded processing for Ordered mode of S3Queue table.
|
||||
/// It allows to parallelize processing within a single server
|
||||
/// and to allow distributed processing.
|
||||
bool isShardedProcessing() const;
|
||||
|
||||
/// Register a new shard for processing.
|
||||
/// Return a shard id of registered shard.
|
||||
size_t registerNewShard();
|
||||
/// Register a new shard for processing by given id.
|
||||
/// Throws exception if shard by this id is already registered.
|
||||
void registerNewShard(size_t shard_id);
|
||||
/// Unregister shard from keeper.
|
||||
void unregisterShard(size_t shard_id);
|
||||
bool isShardRegistered(size_t shard_id);
|
||||
|
||||
/// Total number of processing ids.
|
||||
/// A processing id identifies a single processing thread.
|
||||
/// There might be several processing ids per shard.
|
||||
size_t getProcessingIdsNum() const;
|
||||
/// Get processing ids identified with requested shard.
|
||||
std::vector<size_t> getProcessingIdsForShard(size_t shard_id) const;
|
||||
/// Check if given processing id belongs to a given shard.
|
||||
bool isProcessingIdBelongsToShard(size_t id, size_t shard_id) const;
|
||||
/// Get a processing id for processing thread by given thread id.
|
||||
/// thread id is a value in range [0, threads_per_shard].
|
||||
size_t getIdForProcessingThread(size_t thread_id, size_t shard_id) const;
|
||||
|
||||
bool useBucketsForProcessing() const;
|
||||
/// Calculate which processing id corresponds to a given file path.
|
||||
/// The file will be processed by a thread related to this processing id.
|
||||
size_t getProcessingIdForPath(const std::string & path) const;
|
||||
Bucket getBucketForPath(const std::string & path) const;
|
||||
|
||||
bool tryAcquireBucket(const Bucket & bucket, const Processor & processor);
|
||||
void releaseBucket(const Bucket & bucket);
|
||||
|
||||
private:
|
||||
const S3QueueMode mode;
|
||||
@ -120,13 +105,12 @@ private:
|
||||
const UInt64 max_loading_retries;
|
||||
const size_t min_cleanup_interval_ms;
|
||||
const size_t max_cleanup_interval_ms;
|
||||
const size_t shards_num;
|
||||
const size_t threads_per_shard;
|
||||
const size_t buckets_num;
|
||||
|
||||
const fs::path zookeeper_path;
|
||||
const fs::path zookeeper_processing_path;
|
||||
const fs::path zookeeper_processed_path;
|
||||
const fs::path zookeeper_failed_path;
|
||||
const fs::path zookeeper_shards_path;
|
||||
const fs::path zookeeper_buckets_path;
|
||||
const fs::path zookeeper_cleanup_lock_path;
|
||||
|
||||
LoggerPtr log;
|
||||
@ -135,15 +119,19 @@ private:
|
||||
BackgroundSchedulePool::TaskHolder task;
|
||||
|
||||
std::string getNodeName(const std::string & path);
|
||||
fs::path getBucketLockPath(const Bucket & bucket) const;
|
||||
std::string getProcessorInfo(const std::string & processor_id);
|
||||
|
||||
std::string getProcessedPath(const std::string & path, const std::string & path_hash) const;
|
||||
std::string getProcessingPath(const std::string & path_hash) const;
|
||||
std::string getFailedPath(const std::string & path_hash) const;
|
||||
|
||||
zkutil::ZooKeeperPtr getZooKeeper() const;
|
||||
|
||||
void setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder);
|
||||
void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder);
|
||||
std::string getZooKeeperPathForShard(size_t shard_id) const;
|
||||
|
||||
void setFileProcessedForOrderedModeImpl(
|
||||
const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path);
|
||||
void setFileProcessedForOrderedModeImpl(const std::string & path, ProcessingNodeHolderPtr holder);
|
||||
|
||||
enum class SetFileProcessingResult : uint8_t
|
||||
{
|
||||
|
@ -13,7 +13,7 @@ class ASTStorage;
|
||||
#define S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
|
||||
M(S3QueueMode, \
|
||||
mode, \
|
||||
S3QueueMode::ORDERED, \
|
||||
S3QueueMode::UNORDERED, \
|
||||
"With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer." \
|
||||
"With ordered mode, only the max name of the successfully consumed file stored.", \
|
||||
0) \
|
||||
@ -30,8 +30,7 @@ class ASTStorage;
|
||||
M(UInt32, s3queue_tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
|
||||
M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
|
||||
M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
|
||||
M(UInt32, s3queue_total_shards_num, 1, "Value 0 means disabled", 0) \
|
||||
M(UInt32, s3queue_current_shard_num, 0, "", 0) \
|
||||
M(UInt32, s3queue_buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
|
||||
|
||||
#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \
|
||||
S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
|
||||
|
@ -43,85 +43,207 @@ StorageS3QueueSource::S3QueueKeyWithInfo::S3QueueKeyWithInfo(
|
||||
StorageS3QueueSource::FileIterator::FileIterator(
|
||||
std::shared_ptr<S3QueueFilesMetadata> metadata_,
|
||||
std::unique_ptr<GlobIterator> glob_iterator_,
|
||||
size_t current_shard_,
|
||||
std::atomic<bool> & shutdown_called_,
|
||||
LoggerPtr logger_)
|
||||
: metadata(metadata_)
|
||||
, glob_iterator(std::move(glob_iterator_))
|
||||
, current_processor(getRandomASCIIString(10)) /// TODO: add server uuid?
|
||||
, shutdown_called(shutdown_called_)
|
||||
, log(logger_)
|
||||
, sharded_processing(metadata->isShardedProcessing())
|
||||
, current_shard(current_shard_)
|
||||
{
|
||||
if (sharded_processing)
|
||||
}
|
||||
|
||||
StorageS3QueueSource::FileIterator::~FileIterator()
|
||||
{
|
||||
releaseAndResetCurrentBucket();
|
||||
}
|
||||
|
||||
void StorageS3QueueSource::FileIterator::releaseAndResetCurrentBucket()
|
||||
{
|
||||
try
|
||||
{
|
||||
for (const auto & id : metadata->getProcessingIdsForShard(current_shard))
|
||||
sharded_keys.emplace(id, std::deque<KeyWithInfoPtr>{});
|
||||
if (current_bucket.has_value())
|
||||
{
|
||||
metadata->releaseBucket(current_bucket.value());
|
||||
current_bucket.reset();
|
||||
}
|
||||
}
|
||||
catch (const zkutil::KeeperException & e)
|
||||
{
|
||||
if (e.code == Coordination::Error::ZSESSIONEXPIRED)
|
||||
{
|
||||
LOG_TRACE(log, "Session expired while releasing bucket");
|
||||
}
|
||||
if (e.code == Coordination::Error::ZNONODE)
|
||||
{
|
||||
LOG_TRACE(log, "Bucket {} does not exist. "
|
||||
"This could happen because of an exprired session",
|
||||
current_bucket.value());
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_ERROR(log, "Got unexpected exception while releasing bucket: {}",
|
||||
getCurrentExceptionMessage(true));
|
||||
chassert(false);
|
||||
}
|
||||
current_bucket.reset();
|
||||
}
|
||||
}
|
||||
|
||||
StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(size_t idx)
|
||||
StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket()
|
||||
{
|
||||
while (!shutdown_called)
|
||||
/// We need this lock to maintain consistency between listing s3 directory
|
||||
/// and getting/putting result into listed_keys_cache.
|
||||
std::lock_guard lock(buckets_mutex);
|
||||
|
||||
while (true)
|
||||
{
|
||||
KeyWithInfoPtr val{nullptr};
|
||||
|
||||
/// Each processing thread gets next path from glob_iterator->next()
|
||||
/// and checks if corresponding bucket is already acquired by someone.
|
||||
/// In case it is already acquired, they put the key into listed_keys_cache,
|
||||
/// so that the thread who acquired the bucket will be able to see
|
||||
/// those keys without the need to list s3 directory once again.
|
||||
if (current_bucket.has_value())
|
||||
{
|
||||
std::unique_lock lk(sharded_keys_mutex, std::defer_lock);
|
||||
if (sharded_processing)
|
||||
auto it = listed_keys_cache.find(current_bucket.value());
|
||||
if (it != listed_keys_cache.end())
|
||||
{
|
||||
/// To make sure order on keys in each shard in sharded_keys
|
||||
/// we need to check sharded_keys and to next() under lock.
|
||||
lk.lock();
|
||||
/// `bucket_keys` -- keys we iterated so far and which were not taken for processing.
|
||||
/// `processor` -- processor id of the thread which has acquired the bucket.
|
||||
auto & [bucket_keys, processor] = it->second;
|
||||
|
||||
if (auto it = sharded_keys.find(idx); it != sharded_keys.end())
|
||||
/// Check correctness just in case.
|
||||
if (!processor.has_value() || processor.value() != current_processor)
|
||||
{
|
||||
auto & keys = it->second;
|
||||
if (!keys.empty())
|
||||
{
|
||||
val = keys.front();
|
||||
keys.pop_front();
|
||||
chassert(idx == metadata->getProcessingIdForPath(val->key));
|
||||
}
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Expected current processor {} to be equal to bucket's {} processor, "
|
||||
"but have {}", current_processor, current_bucket.value(),
|
||||
processor.has_value() ? processor.value() : Processor{});
|
||||
}
|
||||
else
|
||||
|
||||
/// Take next key to process
|
||||
if (!bucket_keys.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Processing id {} does not exist (Expected ids: {})",
|
||||
idx, fmt::join(metadata->getProcessingIdsForShard(current_shard), ", "));
|
||||
/// Take the key from the front, the order is important.
|
||||
auto key_with_info = bucket_keys.front();
|
||||
bucket_keys.pop_front();
|
||||
return key_with_info;
|
||||
}
|
||||
|
||||
/// No more keys in bucket, remove it from cache.
|
||||
listed_keys_cache.erase(it);
|
||||
}
|
||||
|
||||
if (!val)
|
||||
if (iterator_finished)
|
||||
{
|
||||
val = glob_iterator->next();
|
||||
if (val && sharded_processing)
|
||||
{
|
||||
const auto processing_id_for_key = metadata->getProcessingIdForPath(val->key);
|
||||
if (idx != processing_id_for_key)
|
||||
{
|
||||
if (metadata->isProcessingIdBelongsToShard(processing_id_for_key, current_shard))
|
||||
{
|
||||
LOG_TEST(log, "Putting key {} into queue of processor {} (total: {})",
|
||||
val->key, processing_id_for_key, sharded_keys.size());
|
||||
/// Bucket is fully processed - release the bucket.
|
||||
releaseAndResetCurrentBucket();
|
||||
}
|
||||
}
|
||||
/// If processing thread has already acquired some bucket
|
||||
/// and while listing s3 directory gets a key which is in a different bucket,
|
||||
/// it puts the key into listed_keys_cache to allow others to process it,
|
||||
/// because one processing thread can acquire only one bucket at a time.
|
||||
/// Once a thread is finished with its acquired bucket, it checks listed_keys_cache
|
||||
/// to see if there are keys from buckets not acquired by anyone.
|
||||
if (!current_bucket.has_value())
|
||||
{
|
||||
for (auto it = listed_keys_cache.begin(); it != listed_keys_cache.end();)
|
||||
{
|
||||
auto & [bucket, bucket_info] = *it;
|
||||
auto & [bucket_keys, processor] = bucket_info;
|
||||
|
||||
if (auto it = sharded_keys.find(processing_id_for_key); it != sharded_keys.end())
|
||||
{
|
||||
it->second.push_back(val);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Processing id {} does not exist (Expected ids: {})",
|
||||
processing_id_for_key, fmt::join(metadata->getProcessingIdsForShard(current_shard), ", "));
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (processor.has_value())
|
||||
{
|
||||
LOG_TEST(log, "Bucket {} is already locked for processing by {} (keys: {})",
|
||||
bucket, processor.value(), bucket_keys.size());
|
||||
++it;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (bucket_keys.empty())
|
||||
{
|
||||
/// No more keys in bucket, remove it from cache.
|
||||
/// We still might add new keys to this bucket if !iterator_finished.
|
||||
it = listed_keys_cache.erase(it);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!metadata->tryAcquireBucket(bucket, current_processor))
|
||||
{
|
||||
LOG_TEST(log, "Bucket {} is already locked for processing (keys: {})",
|
||||
bucket, bucket_keys.size());
|
||||
++it;
|
||||
continue;
|
||||
}
|
||||
|
||||
current_bucket = bucket;
|
||||
processor = current_processor;
|
||||
|
||||
/// Take the key from the front, the order is important.
|
||||
auto key_with_info = bucket_keys.front();
|
||||
bucket_keys.pop_front();
|
||||
return key_with_info;
|
||||
}
|
||||
}
|
||||
|
||||
if (iterator_finished)
|
||||
{
|
||||
LOG_TEST(log, "Reached the end of file iterator and nothing left in keys cache");
|
||||
return {};
|
||||
}
|
||||
|
||||
auto key_with_info = glob_iterator->next();
|
||||
if (key_with_info)
|
||||
{
|
||||
const auto bucket = metadata->getBucketForPath(key_with_info->key);
|
||||
|
||||
LOG_TEST(log, "Found next file: {}, bucket: {}, current bucket: {}",
|
||||
key_with_info->getFileName(), bucket,
|
||||
current_bucket.has_value() ? toString(current_bucket.value()) : "None");
|
||||
|
||||
if (current_bucket.has_value())
|
||||
{
|
||||
if (current_bucket.value() != bucket)
|
||||
{
|
||||
listed_keys_cache[bucket].keys.emplace_back(key_with_info);
|
||||
continue;
|
||||
}
|
||||
return key_with_info;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!metadata->tryAcquireBucket(bucket, current_processor))
|
||||
{
|
||||
LOG_TEST(log, "Bucket {} is already locked for processing", bucket);
|
||||
continue;
|
||||
}
|
||||
|
||||
current_bucket = bucket;
|
||||
return key_with_info;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
releaseAndResetCurrentBucket();
|
||||
|
||||
LOG_TEST(log, "Reached the end of file iterator");
|
||||
iterator_finished = true;
|
||||
|
||||
if (listed_keys_cache.empty())
|
||||
return {};
|
||||
else
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next()
|
||||
{
|
||||
while (!shutdown_called)
|
||||
{
|
||||
auto val = metadata->useBucketsForProcessing() ? getNextKeyFromAcquiredBucket() : glob_iterator->next();
|
||||
if (!val)
|
||||
return {};
|
||||
|
||||
@ -138,19 +260,12 @@ StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(si
|
||||
return {};
|
||||
}
|
||||
|
||||
LOG_TEST(log, "Checking if can process key {} for processing_id {}", val->key, idx);
|
||||
LOG_TEST(log, "Checking if can process key {}", val->key);
|
||||
|
||||
if (processing_holder)
|
||||
{
|
||||
return std::make_shared<S3QueueKeyWithInfo>(val->key, val->info, processing_holder);
|
||||
}
|
||||
else if (sharded_processing
|
||||
&& metadata->getFileStatus(val->key)->state == S3QueueFilesMetadata::FileStatus::State::Processing)
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"File {} is processing by someone else in sharded processing. "
|
||||
"It is a bug", val->key);
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
@ -165,7 +280,6 @@ StorageS3QueueSource::StorageS3QueueSource(
|
||||
const Block & header_,
|
||||
std::unique_ptr<StorageS3Source> internal_source_,
|
||||
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
|
||||
size_t processing_id_,
|
||||
const S3QueueAction & action_,
|
||||
RemoveFileFunc remove_file_func_,
|
||||
const NamesAndTypesList & requested_virtual_columns_,
|
||||
@ -179,7 +293,6 @@ StorageS3QueueSource::StorageS3QueueSource(
|
||||
, WithContext(context_)
|
||||
, name(std::move(name_))
|
||||
, action(action_)
|
||||
, processing_id(processing_id_)
|
||||
, files_metadata(files_metadata_)
|
||||
, internal_source(std::move(internal_source_))
|
||||
, requested_virtual_columns(requested_virtual_columns_)
|
||||
@ -207,7 +320,7 @@ void StorageS3QueueSource::lazyInitialize()
|
||||
if (initialized)
|
||||
return;
|
||||
|
||||
internal_source->lazyInitialize(processing_id);
|
||||
internal_source->lazyInitialize();
|
||||
reader = std::move(internal_source->reader);
|
||||
if (reader)
|
||||
reader_future = std::move(internal_source->reader_future);
|
||||
@ -335,7 +448,7 @@ Chunk StorageS3QueueSource::generate()
|
||||
/// Even if task is finished the thread may be not freed in pool.
|
||||
/// So wait until it will be freed before scheduling a new task.
|
||||
internal_source->create_reader_pool.wait();
|
||||
reader_future = internal_source->createReaderAsync(processing_id);
|
||||
reader_future = internal_source->createReaderAsync();
|
||||
}
|
||||
|
||||
return {};
|
||||
|
@ -41,28 +41,42 @@ public:
|
||||
FileIterator(
|
||||
std::shared_ptr<S3QueueFilesMetadata> metadata_,
|
||||
std::unique_ptr<GlobIterator> glob_iterator_,
|
||||
size_t current_shard_,
|
||||
std::atomic<bool> & shutdown_called_,
|
||||
LoggerPtr logger_);
|
||||
|
||||
~FileIterator() override;
|
||||
|
||||
/// Note:
|
||||
/// List results in s3 are always returned in UTF-8 binary order.
|
||||
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
|
||||
KeyWithInfoPtr next(size_t idx) override;
|
||||
KeyWithInfoPtr next() override;
|
||||
|
||||
size_t estimatedKeysCount() override;
|
||||
|
||||
private:
|
||||
using Bucket = S3QueueFilesMetadata::Bucket;
|
||||
using Processor = S3QueueFilesMetadata::Processor;
|
||||
|
||||
const std::shared_ptr<S3QueueFilesMetadata> metadata;
|
||||
const std::unique_ptr<GlobIterator> glob_iterator;
|
||||
const Processor current_processor;
|
||||
|
||||
std::atomic<bool> & shutdown_called;
|
||||
std::mutex mutex;
|
||||
LoggerPtr log;
|
||||
|
||||
const bool sharded_processing;
|
||||
const size_t current_shard;
|
||||
std::unordered_map<size_t, std::deque<KeyWithInfoPtr>> sharded_keys;
|
||||
std::mutex sharded_keys_mutex;
|
||||
std::optional<Bucket> current_bucket;
|
||||
std::mutex buckets_mutex;
|
||||
struct ListedKeys
|
||||
{
|
||||
std::deque<KeyWithInfoPtr> keys;
|
||||
std::optional<Processor> processor;
|
||||
};
|
||||
std::unordered_map<Bucket, ListedKeys> listed_keys_cache;
|
||||
bool iterator_finished = false;
|
||||
|
||||
KeyWithInfoPtr getNextKeyFromAcquiredBucket();
|
||||
void releaseAndResetCurrentBucket();
|
||||
};
|
||||
|
||||
StorageS3QueueSource(
|
||||
@ -70,7 +84,6 @@ public:
|
||||
const Block & header_,
|
||||
std::unique_ptr<StorageS3Source> internal_source_,
|
||||
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
|
||||
size_t processing_id_,
|
||||
const S3QueueAction & action_,
|
||||
RemoveFileFunc remove_file_func_,
|
||||
const NamesAndTypesList & requested_virtual_columns_,
|
||||
@ -92,7 +105,6 @@ public:
|
||||
private:
|
||||
const String name;
|
||||
const S3QueueAction action;
|
||||
const size_t processing_id;
|
||||
const std::shared_ptr<S3QueueFilesMetadata> files_metadata;
|
||||
const std::shared_ptr<StorageS3Source> internal_source;
|
||||
const NamesAndTypesList requested_virtual_columns;
|
||||
|
@ -38,11 +38,15 @@ S3QueueTableMetadata::S3QueueTableMetadata(
|
||||
const StorageInMemoryMetadata & storage_metadata)
|
||||
{
|
||||
format_name = configuration.format;
|
||||
LOG_TEST(getLogger("KSSENII"), "KSSENII SEEEE: {}", engine_settings.after_processing.value);
|
||||
after_processing = engine_settings.after_processing.toString();
|
||||
LOG_TEST(getLogger("KSSENII"), "KSSENII SEE 2: {}", after_processing);
|
||||
mode = engine_settings.mode.toString();
|
||||
LOG_TEST(getLogger("KSSENII"), "KSSENII SEE 2: {}", mode);
|
||||
s3queue_tracked_files_limit = engine_settings.s3queue_tracked_files_limit;
|
||||
s3queue_tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec;
|
||||
s3queue_total_shards_num = engine_settings.s3queue_total_shards_num;
|
||||
s3queue_buckets = engine_settings.s3queue_buckets;
|
||||
LOG_TEST(getLogger("KSSENII"), "KSSENII SEE 2: {}", s3queue_buckets);
|
||||
s3queue_processing_threads_num = engine_settings.s3queue_processing_threads_num;
|
||||
columns = storage_metadata.getColumns().toString();
|
||||
}
|
||||
@ -54,7 +58,7 @@ String S3QueueTableMetadata::toString() const
|
||||
json.set("mode", mode);
|
||||
json.set("s3queue_tracked_files_limit", s3queue_tracked_files_limit);
|
||||
json.set("s3queue_tracked_file_ttl_sec", s3queue_tracked_file_ttl_sec);
|
||||
json.set("s3queue_total_shards_num", s3queue_total_shards_num);
|
||||
json.set("s3queue_buckets", s3queue_buckets);
|
||||
json.set("s3queue_processing_threads_num", s3queue_processing_threads_num);
|
||||
json.set("format_name", format_name);
|
||||
json.set("columns", columns);
|
||||
@ -77,10 +81,10 @@ void S3QueueTableMetadata::read(const String & metadata_str)
|
||||
format_name = json->getValue<String>("format_name");
|
||||
columns = json->getValue<String>("columns");
|
||||
|
||||
if (json->has("s3queue_total_shards_num"))
|
||||
s3queue_total_shards_num = json->getValue<UInt64>("s3queue_total_shards_num");
|
||||
if (json->has("s3queue_buckets"))
|
||||
s3queue_buckets = json->getValue<UInt64>("s3queue_buckets");
|
||||
else
|
||||
s3queue_total_shards_num = 1;
|
||||
s3queue_buckets = 1;
|
||||
|
||||
if (json->has("s3queue_processing_threads_num"))
|
||||
s3queue_processing_threads_num = json->getValue<UInt64>("s3queue_processing_threads_num");
|
||||
@ -148,14 +152,13 @@ void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata
|
||||
from_zk.s3queue_processing_threads_num,
|
||||
s3queue_processing_threads_num);
|
||||
}
|
||||
if (s3queue_total_shards_num != from_zk.s3queue_total_shards_num)
|
||||
if (s3queue_buckets != from_zk.s3queue_buckets)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::METADATA_MISMATCH,
|
||||
"Existing table metadata in ZooKeeper differs in s3queue_total_shards_num setting. "
|
||||
"Existing table metadata in ZooKeeper differs in s3queue_buckets setting. "
|
||||
"Stored in ZooKeeper: {}, local: {}",
|
||||
from_zk.s3queue_total_shards_num,
|
||||
s3queue_total_shards_num);
|
||||
from_zk.s3queue_buckets, s3queue_buckets);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ struct S3QueueTableMetadata
|
||||
String mode;
|
||||
UInt64 s3queue_tracked_files_limit = 0;
|
||||
UInt64 s3queue_tracked_file_ttl_sec = 0;
|
||||
UInt64 s3queue_total_shards_num = 1;
|
||||
UInt64 s3queue_buckets = 0;
|
||||
UInt64 s3queue_processing_threads_num = 1;
|
||||
|
||||
S3QueueTableMetadata() = default;
|
||||
|
@ -107,18 +107,19 @@ StorageS3Queue::StorageS3Queue(
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
ASTStorage * engine_args,
|
||||
ASTStorage * /* engine_args */,
|
||||
LoadingStrictnessLevel mode)
|
||||
: IStorage(table_id_)
|
||||
, WithContext(context_)
|
||||
, s3queue_settings(std::move(s3queue_settings_))
|
||||
, zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings))
|
||||
, after_processing(s3queue_settings->after_processing)
|
||||
, configuration{configuration_}
|
||||
, format_settings(format_settings_)
|
||||
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
|
||||
, log(getLogger("StorageS3Queue (" + table_id_.getFullTableName() + ")"))
|
||||
{
|
||||
LOG_TEST(log, "KSSENII SEE: {}", s3queue_settings->after_processing.value);
|
||||
|
||||
if (configuration.url.key.empty())
|
||||
{
|
||||
configuration.url.key = "/*";
|
||||
@ -135,7 +136,7 @@ StorageS3Queue::StorageS3Queue(
|
||||
if (mode == LoadingStrictnessLevel::CREATE
|
||||
&& !context_->getSettingsRef().s3queue_allow_experimental_sharded_mode
|
||||
&& s3queue_settings->mode == S3QueueMode::ORDERED
|
||||
&& (s3queue_settings->s3queue_total_shards_num > 1 || s3queue_settings->s3queue_processing_threads_num > 1))
|
||||
&& (s3queue_settings->s3queue_buckets > 1 || s3queue_settings->s3queue_processing_threads_num > 1))
|
||||
{
|
||||
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue sharded mode is not allowed. To enable use `s3queue_allow_experimental_sharded_mode`");
|
||||
}
|
||||
@ -178,21 +179,9 @@ StorageS3Queue::StorageS3Queue(
|
||||
/// The ref count is decreased when StorageS3Queue::drop() method is called.
|
||||
files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings);
|
||||
|
||||
if (files_metadata->isShardedProcessing())
|
||||
{
|
||||
if (!s3queue_settings->s3queue_current_shard_num.changed)
|
||||
{
|
||||
s3queue_settings->s3queue_current_shard_num = static_cast<UInt32>(files_metadata->registerNewShard());
|
||||
engine_args->settings->changes.setSetting("s3queue_current_shard_num", s3queue_settings->s3queue_current_shard_num.value);
|
||||
}
|
||||
else if (!files_metadata->isShardRegistered(s3queue_settings->s3queue_current_shard_num))
|
||||
{
|
||||
files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num);
|
||||
}
|
||||
}
|
||||
if (s3queue_settings->mode == S3QueueMode::ORDERED && !s3queue_settings->s3queue_last_processed_path.value.empty())
|
||||
{
|
||||
files_metadata->setFileProcessed(s3queue_settings->s3queue_last_processed_path.value, s3queue_settings->s3queue_current_shard_num);
|
||||
files_metadata->setFileProcessed(s3queue_settings->s3queue_last_processed_path.value);
|
||||
}
|
||||
}
|
||||
|
||||
@ -216,13 +205,6 @@ void StorageS3Queue::shutdown(bool is_drop)
|
||||
if (files_metadata)
|
||||
{
|
||||
files_metadata->deactivateCleanupTask();
|
||||
|
||||
if (is_drop && files_metadata->isShardedProcessing())
|
||||
{
|
||||
files_metadata->unregisterShard(s3queue_settings->s3queue_current_shard_num);
|
||||
LOG_TRACE(log, "Unregistered shard {} from zookeeper", s3queue_settings->s3queue_current_shard_num);
|
||||
}
|
||||
|
||||
files_metadata.reset();
|
||||
}
|
||||
LOG_TRACE(log, "Shut down storage");
|
||||
@ -343,7 +325,6 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const
|
||||
pipes.emplace_back(storage->createSource(
|
||||
info,
|
||||
iterator,
|
||||
storage->files_metadata->getIdForProcessingThread(i, storage->s3queue_settings->s3queue_current_shard_num),
|
||||
max_block_size, context));
|
||||
|
||||
auto pipe = Pipe::unitePipes(std::move(pipes));
|
||||
@ -359,7 +340,6 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const
|
||||
std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
|
||||
const ReadFromFormatInfo & info,
|
||||
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
|
||||
size_t processing_id,
|
||||
size_t max_block_size,
|
||||
ContextPtr local_context)
|
||||
{
|
||||
@ -405,7 +385,7 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
|
||||
auto s3_queue_log = s3queue_settings->s3queue_enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr;
|
||||
return std::make_shared<StorageS3QueueSource>(
|
||||
getName(), info.source_header, std::move(internal_source),
|
||||
files_metadata, processing_id, after_processing, file_deleter, info.requested_virtual_columns,
|
||||
files_metadata, s3queue_settings->after_processing, file_deleter, info.requested_virtual_columns,
|
||||
local_context, shutdown_called, table_is_being_dropped, s3_queue_log, getStorageID(), log);
|
||||
}
|
||||
|
||||
@ -508,10 +488,7 @@ bool StorageS3Queue::streamToViews()
|
||||
pipes.reserve(s3queue_settings->s3queue_processing_threads_num);
|
||||
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
|
||||
{
|
||||
auto source = createSource(
|
||||
read_from_format_info, file_iterator, files_metadata->getIdForProcessingThread(i, s3queue_settings->s3queue_current_shard_num),
|
||||
DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
|
||||
|
||||
auto source = createSource(read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
|
||||
pipes.emplace_back(std::move(source));
|
||||
}
|
||||
auto pipe = Pipe::unitePipes(std::move(pipes));
|
||||
@ -551,6 +528,8 @@ void StorageS3Queue::createOrCheckMetadata(const StorageInMemoryMetadata & stora
|
||||
if (zookeeper->exists(zk_path / "metadata"))
|
||||
{
|
||||
checkTableStructure(zk_path, storage_metadata);
|
||||
checkTableStructure(zk_path, storage_metadata);
|
||||
checkTableStructure(zk_path, storage_metadata);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -623,8 +602,7 @@ std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator
|
||||
/* read_keys */ nullptr,
|
||||
configuration.request_settings);
|
||||
|
||||
return std::make_shared<FileIterator>(
|
||||
files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called, log);
|
||||
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator), shutdown_called, log);
|
||||
}
|
||||
|
||||
void registerStorageS3Queue(StorageFactory & factory)
|
||||
|
@ -59,7 +59,6 @@ private:
|
||||
|
||||
const std::unique_ptr<S3QueueSettings> s3queue_settings;
|
||||
const fs::path zk_path;
|
||||
const S3QueueAction after_processing;
|
||||
|
||||
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
|
||||
Configuration configuration;
|
||||
@ -86,7 +85,6 @@ private:
|
||||
std::shared_ptr<StorageS3QueueSource> createSource(
|
||||
const ReadFromFormatInfo & info,
|
||||
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
|
||||
size_t processing_id,
|
||||
size_t max_block_size,
|
||||
ContextPtr local_context);
|
||||
|
||||
|
@ -230,7 +230,7 @@ public:
|
||||
expanded_keys_iter++;
|
||||
}
|
||||
|
||||
KeyWithInfoPtr next(size_t)
|
||||
KeyWithInfoPtr next()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return nextAssumeLocked();
|
||||
@ -491,9 +491,9 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
|
||||
{
|
||||
}
|
||||
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next(size_t idx) /// NOLINT
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next() /// NOLINT
|
||||
{
|
||||
return pimpl->next(idx);
|
||||
return pimpl->next();
|
||||
}
|
||||
|
||||
size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount()
|
||||
@ -535,7 +535,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
KeyWithInfoPtr next(size_t)
|
||||
KeyWithInfoPtr next()
|
||||
{
|
||||
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
|
||||
if (current_index >= keys.size())
|
||||
@ -579,9 +579,9 @@ StorageS3Source::KeysIterator::KeysIterator(
|
||||
{
|
||||
}
|
||||
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next(size_t idx) /// NOLINT
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next() /// NOLINT
|
||||
{
|
||||
return pimpl->next(idx);
|
||||
return pimpl->next();
|
||||
}
|
||||
|
||||
size_t StorageS3Source::KeysIterator::estimatedKeysCount()
|
||||
@ -608,7 +608,7 @@ StorageS3Source::ReadTaskIterator::ReadTaskIterator(
|
||||
buffer.emplace_back(std::make_shared<KeyWithInfo>(key_future.get()));
|
||||
}
|
||||
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next(size_t) /// NOLINT
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next()
|
||||
{
|
||||
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
|
||||
if (current_index >= buffer.size())
|
||||
@ -663,7 +663,7 @@ StorageS3Source::ArchiveIterator::ArchiveIterator(
|
||||
}
|
||||
}
|
||||
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::ArchiveIterator::next(size_t)
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::ArchiveIterator::next()
|
||||
{
|
||||
if (!path_in_archive.empty())
|
||||
{
|
||||
@ -789,23 +789,23 @@ StorageS3Source::StorageS3Source(
|
||||
{
|
||||
}
|
||||
|
||||
void StorageS3Source::lazyInitialize(size_t idx)
|
||||
void StorageS3Source::lazyInitialize()
|
||||
{
|
||||
if (initialized)
|
||||
return;
|
||||
|
||||
reader = createReader(idx);
|
||||
reader = createReader();
|
||||
if (reader)
|
||||
reader_future = createReaderAsync(idx);
|
||||
reader_future = createReaderAsync();
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
StorageS3Source::ReaderHolder StorageS3Source::createReader(size_t idx)
|
||||
StorageS3Source::ReaderHolder StorageS3Source::createReader()
|
||||
{
|
||||
KeyWithInfoPtr key_with_info;
|
||||
do
|
||||
{
|
||||
key_with_info = file_iterator->next(idx);
|
||||
key_with_info = file_iterator->next();
|
||||
if (!key_with_info || key_with_info->key.empty())
|
||||
return {};
|
||||
|
||||
@ -888,9 +888,9 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader(size_t idx)
|
||||
return ReaderHolder{key_with_info, bucket, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)};
|
||||
}
|
||||
|
||||
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync(size_t idx)
|
||||
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()
|
||||
{
|
||||
return create_reader_scheduler([=, this] { return createReader(idx); }, Priority{});
|
||||
return create_reader_scheduler([=, this] { return createReader(); }, Priority{});
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> createS3ReadBuffer(
|
||||
|
@ -76,7 +76,7 @@ public:
|
||||
{
|
||||
public:
|
||||
virtual ~IIterator() = default;
|
||||
virtual KeyWithInfoPtr next(size_t idx = 0) = 0; /// NOLINT
|
||||
virtual KeyWithInfoPtr next() = 0; /// NOLINT
|
||||
|
||||
/// Estimates how many streams we need to process all files.
|
||||
/// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys.
|
||||
@ -100,7 +100,7 @@ public:
|
||||
const S3Settings::RequestSettings & request_settings_ = {},
|
||||
std::function<void(FileProgress)> progress_callback_ = {});
|
||||
|
||||
KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT
|
||||
KeyWithInfoPtr next() override; /// NOLINT
|
||||
size_t estimatedKeysCount() override;
|
||||
|
||||
private:
|
||||
@ -121,7 +121,7 @@ public:
|
||||
KeysWithInfo * read_keys = nullptr,
|
||||
std::function<void(FileProgress)> progress_callback_ = {});
|
||||
|
||||
KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT
|
||||
KeyWithInfoPtr next() override; /// NOLINT
|
||||
size_t estimatedKeysCount() override;
|
||||
|
||||
private:
|
||||
@ -135,7 +135,7 @@ public:
|
||||
public:
|
||||
explicit ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count);
|
||||
|
||||
KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT
|
||||
KeyWithInfoPtr next() override; /// NOLINT
|
||||
size_t estimatedKeysCount() override;
|
||||
|
||||
private:
|
||||
@ -158,7 +158,7 @@ public:
|
||||
ContextPtr context_,
|
||||
KeysWithInfo * read_keys_);
|
||||
|
||||
KeyWithInfoPtr next(size_t) override; /// NOLINT
|
||||
KeyWithInfoPtr next() override; /// NOLINT
|
||||
size_t estimatedKeysCount() override;
|
||||
void refreshArchiveReader();
|
||||
|
||||
@ -301,11 +301,11 @@ private:
|
||||
|
||||
/// Notice: we should initialize reader and future_reader lazily in generate to make sure key_condition
|
||||
/// is set before createReader is invoked for key_condition is read in createReader.
|
||||
void lazyInitialize(size_t idx = 0);
|
||||
void lazyInitialize();
|
||||
|
||||
/// Recreate ReadBuffer and Pipeline for each file.
|
||||
ReaderHolder createReader(size_t idx = 0);
|
||||
std::future<ReaderHolder> createReaderAsync(size_t idx = 0);
|
||||
ReaderHolder createReader();
|
||||
std::future<ReaderHolder> createReaderAsync();
|
||||
|
||||
void addNumRowsToCache(const String & bucket_with_key, size_t num_rows);
|
||||
std::optional<size_t> tryGetNumRowsFromCache(const KeyWithInfo & key_with_info);
|
||||
|
@ -13,4 +13,19 @@
|
||||
<port>2181</port>
|
||||
</node>
|
||||
</zookeeper>
|
||||
|
||||
<remote_servers>
|
||||
<default>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>instance</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>instance2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</default>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
||||
|
@ -1061,7 +1061,7 @@ def test_processing_threads(started_cluster, mode):
|
||||
def get_count(table_name):
|
||||
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
||||
|
||||
for _ in range(100):
|
||||
for _ in range(30):
|
||||
if (get_count(f"{dst_table_name}")) == files_to_generate:
|
||||
break
|
||||
time.sleep(1)
|
||||
@ -1078,7 +1078,7 @@ def test_processing_threads(started_cluster, mode):
|
||||
|
||||
if mode == "ordered":
|
||||
zk = started_cluster.get_kazoo_client("zoo1")
|
||||
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
|
||||
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
|
||||
assert len(processed_nodes) == processing_threads
|
||||
|
||||
|
||||
@ -1112,7 +1112,7 @@ def test_shards(started_cluster, mode, processing_threads):
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": processing_threads,
|
||||
"s3queue_total_shards_num": shards_num,
|
||||
"s3queue_buckets": shards_num,
|
||||
},
|
||||
)
|
||||
create_mv(node, table, dst_table)
|
||||
@ -1125,12 +1125,10 @@ def test_shards(started_cluster, mode, processing_threads):
|
||||
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
||||
|
||||
for _ in range(100):
|
||||
if (
|
||||
get_count(f"{dst_table_name}_1")
|
||||
+ get_count(f"{dst_table_name}_2")
|
||||
+ get_count(f"{dst_table_name}_3")
|
||||
) == files_to_generate:
|
||||
count = get_count(f"{dst_table_name}_1") + get_count(f"{dst_table_name}_2") + get_count(f"{dst_table_name}_3")
|
||||
if count == files_to_generate:
|
||||
break
|
||||
print(f"Current {count}/{files_to_generate}")
|
||||
time.sleep(1)
|
||||
|
||||
if (
|
||||
@ -1138,10 +1136,22 @@ def test_shards(started_cluster, mode, processing_threads):
|
||||
+ get_count(f"{dst_table_name}_2")
|
||||
+ get_count(f"{dst_table_name}_3")
|
||||
) != files_to_generate:
|
||||
processed_files = node.query(
|
||||
f"select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' order by file"
|
||||
).strip().split('\n')
|
||||
logging.debug(f"Processed files: {len(processed_files)}/{files_to_generate}")
|
||||
|
||||
count = get_count(f"{dst_table_name}_1") + get_count(f"{dst_table_name}_2") + get_count(f"{dst_table_name}_3")
|
||||
logging.debug(f"Processed rows: {count}/{files_to_generate}")
|
||||
|
||||
info = node.query(
|
||||
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
|
||||
f"""
|
||||
select concat('test_', toString(number), '.csv') as file from numbers(300)
|
||||
where file not in (select splitByChar('/', file_name)[-1] from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed')
|
||||
"""
|
||||
)
|
||||
logging.debug(info)
|
||||
logging.debug(f"Unprocessed files: {info}")
|
||||
|
||||
assert False
|
||||
|
||||
res1 = [
|
||||
@ -1176,10 +1186,8 @@ def test_shards(started_cluster, mode, processing_threads):
|
||||
|
||||
if mode == "ordered":
|
||||
zk = started_cluster.get_kazoo_client("zoo1")
|
||||
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
|
||||
assert len(processed_nodes) == shards_num * processing_threads
|
||||
shard_nodes = zk.get_children(f"{keeper_path}/shards/")
|
||||
assert len(shard_nodes) == shards_num
|
||||
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
|
||||
assert len(processed_nodes) == shards_num
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -1214,7 +1222,7 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": processing_threads,
|
||||
"s3queue_total_shards_num": shards_num,
|
||||
"s3queue_buckets": shards_num,
|
||||
},
|
||||
)
|
||||
i += 1
|
||||
@ -1229,7 +1237,7 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
|
||||
def get_count(node, table_name):
|
||||
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
||||
|
||||
for _ in range(150):
|
||||
for _ in range(10):
|
||||
if (
|
||||
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
||||
) == total_rows:
|
||||
@ -1239,10 +1247,47 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
|
||||
if (
|
||||
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
||||
) != total_rows:
|
||||
processed_files = node.query(
|
||||
f"""
|
||||
select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
|
||||
"""
|
||||
).strip().split('\n')
|
||||
logging.debug(f"Processed files by node 1: {len(processed_files)}/{files_to_generate}")
|
||||
processed_files = node_2.query(
|
||||
f"""
|
||||
select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
|
||||
"""
|
||||
).strip().split('\n')
|
||||
logging.debug(f"Processed files by node 2: {len(processed_files)}/{files_to_generate}")
|
||||
|
||||
count = get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
||||
logging.debug(f"Processed rows: {count}/{files_to_generate}")
|
||||
|
||||
info = node.query(
|
||||
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
|
||||
f"""
|
||||
select concat('test_', toString(number), '.csv') as file from numbers(300)
|
||||
where file not in (select splitByChar('/', file_name)[-1] from clusterAllReplicas(default, system.s3queue)
|
||||
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0)
|
||||
"""
|
||||
)
|
||||
logging.debug(info)
|
||||
logging.debug(f"Unprocessed files: {info}")
|
||||
|
||||
files1 = node.query(
|
||||
f"""
|
||||
select splitByChar('/', file_name)[-1] from system.s3queue
|
||||
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0
|
||||
"""
|
||||
).strip().split("\n")
|
||||
files2 = node_2.query(
|
||||
f"""
|
||||
select splitByChar('/', file_name)[-1] from system.s3queue
|
||||
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0
|
||||
"""
|
||||
).strip().split("\n")
|
||||
def intersection(list_a, list_b):
|
||||
return [ e for e in list_a if e in list_b ]
|
||||
logging.debug(f"Intersecting files: {intersection(files1, files2)}")
|
||||
|
||||
assert False
|
||||
|
||||
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
|
||||
@ -1267,10 +1312,8 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
|
||||
|
||||
if mode == "ordered":
|
||||
zk = started_cluster.get_kazoo_client("zoo1")
|
||||
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
|
||||
assert len(processed_nodes) == shards_num * processing_threads
|
||||
shard_nodes = zk.get_children(f"{keeper_path}/shards/")
|
||||
assert len(shard_nodes) == shards_num
|
||||
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
|
||||
assert len(processed_nodes) == shards_num
|
||||
|
||||
node.restart_clickhouse()
|
||||
time.sleep(10)
|
||||
@ -1297,12 +1340,12 @@ def test_settings_check(started_cluster):
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": 5,
|
||||
"s3queue_total_shards_num": 2,
|
||||
"s3queue_buckets": 2,
|
||||
},
|
||||
)
|
||||
|
||||
assert (
|
||||
"Existing table metadata in ZooKeeper differs in s3queue_total_shards_num setting. Stored in ZooKeeper: 2, local: 3"
|
||||
"Existing table metadata in ZooKeeper differs in s3queue_buckets setting. Stored in ZooKeeper: 2, local: 3"
|
||||
in create_table(
|
||||
started_cluster,
|
||||
node_2,
|
||||
@ -1312,7 +1355,7 @@ def test_settings_check(started_cluster):
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": 5,
|
||||
"s3queue_total_shards_num": 3,
|
||||
"s3queue_buckets": 3,
|
||||
},
|
||||
expect_error=True,
|
||||
)
|
||||
@ -1329,7 +1372,7 @@ def test_settings_check(started_cluster):
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": 2,
|
||||
"s3queue_total_shards_num": 2,
|
||||
"s3queue_buckets": 2,
|
||||
},
|
||||
expect_error=True,
|
||||
)
|
||||
@ -1419,7 +1462,7 @@ def test_processed_file_setting_distributed(started_cluster, processing_threads)
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": processing_threads,
|
||||
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
|
||||
"s3queue_total_shards_num": 2,
|
||||
"s3queue_buckets": 2,
|
||||
},
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user