mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Merge pull request #59167 from ClickHouse/s3-queue-parallelize-ordered-mode
S3Queue: allow parallel & disrtibuted processing for ordered mode
This commit is contained in:
commit
b20567a055
@ -129,9 +129,12 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, con
|
||||
, max_loading_retries(settings_.s3queue_loading_retries.value)
|
||||
, min_cleanup_interval_ms(settings_.s3queue_cleanup_interval_min_ms.value)
|
||||
, max_cleanup_interval_ms(settings_.s3queue_cleanup_interval_max_ms.value)
|
||||
, shards_num(settings_.s3queue_total_shards_num)
|
||||
, threads_per_shard(settings_.s3queue_processing_threads_num)
|
||||
, zookeeper_processing_path(zookeeper_path_ / "processing")
|
||||
, zookeeper_processed_path(zookeeper_path_ / "processed")
|
||||
, zookeeper_failed_path(zookeeper_path_ / "failed")
|
||||
, zookeeper_shards_path(zookeeper_path_ / "shards")
|
||||
, zookeeper_cleanup_lock_path(zookeeper_path_ / "cleanup_lock")
|
||||
, log(getLogger("S3QueueFilesMetadata"))
|
||||
{
|
||||
@ -197,6 +200,123 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata(
|
||||
return metadata;
|
||||
}
|
||||
|
||||
bool S3QueueFilesMetadata::isShardedProcessing() const
|
||||
{
|
||||
return getProcessingIdsNum() > 1 && mode == S3QueueMode::ORDERED;
|
||||
}
|
||||
|
||||
size_t S3QueueFilesMetadata::registerNewShard()
|
||||
{
|
||||
if (!isShardedProcessing())
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot register a new shard, because processing is not sharded");
|
||||
}
|
||||
|
||||
const auto zk_client = getZooKeeper();
|
||||
zk_client->createAncestors(zookeeper_shards_path / "");
|
||||
|
||||
std::string shard_node_path;
|
||||
size_t shard_id = 0;
|
||||
for (size_t i = 0; i < shards_num; ++i)
|
||||
{
|
||||
const auto node_path = getZooKeeperPathForShard(i);
|
||||
auto err = zk_client->tryCreate(node_path, "", zkutil::CreateMode::Persistent);
|
||||
if (err == Coordination::Error::ZOK)
|
||||
{
|
||||
shard_node_path = node_path;
|
||||
shard_id = i;
|
||||
break;
|
||||
}
|
||||
else if (err == Coordination::Error::ZNODEEXISTS)
|
||||
continue;
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Unexpected error: {}", magic_enum::enum_name(err));
|
||||
}
|
||||
|
||||
if (shard_node_path.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to register a new shard");
|
||||
|
||||
LOG_TRACE(log, "Using shard {} (zk node: {})", shard_id, shard_node_path);
|
||||
return shard_id;
|
||||
}
|
||||
|
||||
std::string S3QueueFilesMetadata::getZooKeeperPathForShard(size_t shard_id) const
|
||||
{
|
||||
return zookeeper_shards_path / ("shard" + toString(shard_id));
|
||||
}
|
||||
|
||||
void S3QueueFilesMetadata::registerNewShard(size_t shard_id)
|
||||
{
|
||||
if (!isShardedProcessing())
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot register a new shard, because processing is not sharded");
|
||||
}
|
||||
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto node_path = getZooKeeperPathForShard(shard_id);
|
||||
zk_client->createAncestors(node_path);
|
||||
|
||||
auto err = zk_client->tryCreate(node_path, "", zkutil::CreateMode::Persistent);
|
||||
if (err != Coordination::Error::ZOK)
|
||||
{
|
||||
if (err == Coordination::Error::ZNODEEXISTS)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot register shard {}: already exists", shard_id);
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Unexpected error: {}", magic_enum::enum_name(err));
|
||||
}
|
||||
}
|
||||
|
||||
bool S3QueueFilesMetadata::isShardRegistered(size_t shard_id)
|
||||
{
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto node_path = getZooKeeperPathForShard(shard_id);
|
||||
return zk_client->exists(node_path);
|
||||
}
|
||||
|
||||
void S3QueueFilesMetadata::unregisterShard(size_t shard_id)
|
||||
{
|
||||
if (!isShardedProcessing())
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot unregister a shard, because processing is not sharded");
|
||||
}
|
||||
|
||||
const auto zk_client = getZooKeeper();
|
||||
const auto node_path = getZooKeeperPathForShard(shard_id);
|
||||
zk_client->remove(node_path);
|
||||
}
|
||||
|
||||
size_t S3QueueFilesMetadata::getProcessingIdsNum() const
|
||||
{
|
||||
return shards_num * threads_per_shard;
|
||||
}
|
||||
|
||||
std::vector<size_t> S3QueueFilesMetadata::getProcessingIdsForShard(size_t shard_id) const
|
||||
{
|
||||
std::vector<size_t> res(threads_per_shard);
|
||||
std::iota(res.begin(), res.end(), shard_id * threads_per_shard);
|
||||
return res;
|
||||
}
|
||||
|
||||
bool S3QueueFilesMetadata::isProcessingIdBelongsToShard(size_t id, size_t shard_id) const
|
||||
{
|
||||
return shard_id * threads_per_shard <= id && id < (shard_id + 1) * threads_per_shard;
|
||||
}
|
||||
|
||||
size_t S3QueueFilesMetadata::getIdForProcessingThread(size_t thread_id, size_t shard_id) const
|
||||
{
|
||||
return shard_id * threads_per_shard + thread_id;
|
||||
}
|
||||
|
||||
size_t S3QueueFilesMetadata::getProcessingIdForPath(const std::string & path) const
|
||||
{
|
||||
return sipHash64(path) % getProcessingIdsNum();
|
||||
}
|
||||
|
||||
S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
|
||||
{
|
||||
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds);
|
||||
@ -212,16 +332,24 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs
|
||||
std::lock_guard lock(file_status->metadata_lock);
|
||||
switch (file_status->state)
|
||||
{
|
||||
case FileStatus::State::Processing: [[fallthrough]];
|
||||
case FileStatus::State::Processing:
|
||||
{
|
||||
LOG_TEST(log, "File {} is already processing", path);
|
||||
return {};
|
||||
}
|
||||
case FileStatus::State::Processed:
|
||||
{
|
||||
LOG_TEST(log, "File {} is already processed", path);
|
||||
return {};
|
||||
}
|
||||
case FileStatus::State::Failed:
|
||||
{
|
||||
/// If max_loading_retries == 0, file is not retriable.
|
||||
if (max_loading_retries == 0)
|
||||
{
|
||||
LOG_TEST(log, "File {} is failed and processing retries are disabled", path);
|
||||
return {};
|
||||
}
|
||||
|
||||
/// Otherwise file_status->retries is also cached.
|
||||
/// In case file_status->retries >= max_loading_retries we can fully rely that it is true
|
||||
@ -230,7 +358,10 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs
|
||||
/// (another server could have done a try after we cached retries value),
|
||||
/// so check with zookeeper here.
|
||||
if (file_status->retries >= max_loading_retries)
|
||||
{
|
||||
LOG_TEST(log, "File {} is failed and processing retries are exceeeded", path);
|
||||
return {};
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
@ -284,35 +415,31 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs
|
||||
if (!file_status->processing_start_time)
|
||||
file_status->processing_start_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
|
||||
break;
|
||||
return processing_node_holder;
|
||||
}
|
||||
case SetFileProcessingResult::AlreadyProcessed:
|
||||
{
|
||||
std::lock_guard lock(file_status->metadata_lock);
|
||||
file_status->state = FileStatus::State::Processed;
|
||||
break;
|
||||
return {};
|
||||
}
|
||||
case SetFileProcessingResult::AlreadyFailed:
|
||||
{
|
||||
std::lock_guard lock(file_status->metadata_lock);
|
||||
file_status->state = FileStatus::State::Failed;
|
||||
break;
|
||||
return {};
|
||||
}
|
||||
case SetFileProcessingResult::ProcessingByOtherNode:
|
||||
{
|
||||
/// We cannot save any local state here, see comment above.
|
||||
break;
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
if (result == SetFileProcessingResult::Success)
|
||||
return processing_node_holder;
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
|
||||
S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status)
|
||||
S3QueueFilesMetadata::ProcessingNodeHolderPtr>
|
||||
S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status)
|
||||
{
|
||||
/// In one zookeeper transaction do the following:
|
||||
/// 1. check that corresponding persistent nodes do not exist in processed/ and failed/;
|
||||
@ -339,7 +466,8 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
|
||||
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
|
||||
auto holder = std::make_unique<ProcessingNodeHolder>(
|
||||
node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
|
||||
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
|
||||
}
|
||||
|
||||
@ -362,7 +490,8 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
|
||||
}
|
||||
|
||||
std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
|
||||
S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status)
|
||||
S3QueueFilesMetadata::ProcessingNodeHolderPtr>
|
||||
S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status)
|
||||
{
|
||||
/// Same as for Unordered mode.
|
||||
/// The only difference is the check if the file is already processed.
|
||||
@ -385,10 +514,15 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
|
||||
/// If the version did change - retry (since we cannot do Get and Create requests
|
||||
/// in the same zookeeper transaction, so we use a while loop with tries).
|
||||
|
||||
Coordination::Stat processed_node_stat;
|
||||
auto data = zk_client->get(zookeeper_processed_path, &processed_node_stat);
|
||||
auto processed_node = isShardedProcessing()
|
||||
? zookeeper_processed_path / toString(getProcessingIdForPath(path))
|
||||
: zookeeper_processed_path;
|
||||
|
||||
NodeMetadata processed_node_metadata;
|
||||
if (!data.empty())
|
||||
Coordination::Stat processed_node_stat;
|
||||
std::string data;
|
||||
auto processed_node_exists = zk_client->tryGet(processed_node, data, &processed_node_stat);
|
||||
if (processed_node_exists && !data.empty())
|
||||
processed_node_metadata = NodeMetadata::fromString(data);
|
||||
|
||||
auto max_processed_file_path = processed_node_metadata.file_path;
|
||||
@ -403,13 +537,25 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
|
||||
requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name, -1));
|
||||
|
||||
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral));
|
||||
requests.push_back(zkutil::makeCheckRequest(zookeeper_processed_path, processed_node_stat.version));
|
||||
|
||||
if (processed_node_exists)
|
||||
{
|
||||
requests.push_back(zkutil::makeCheckRequest(processed_node, processed_node_stat.version));
|
||||
}
|
||||
else
|
||||
{
|
||||
requests.push_back(zkutil::makeCreateRequest(processed_node, "", zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeRemoveRequest(processed_node, -1));
|
||||
}
|
||||
|
||||
Coordination::Responses responses;
|
||||
auto code = zk_client->tryMulti(requests, responses);
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
|
||||
auto holder = std::make_unique<ProcessingNodeHolder>(
|
||||
node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
|
||||
|
||||
LOG_TEST(log, "File {} is ready to be processed", path);
|
||||
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
|
||||
}
|
||||
|
||||
@ -500,11 +646,16 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
|
||||
const auto node_metadata = createNodeMetadata(path).toString();
|
||||
const auto zk_client = getZooKeeper();
|
||||
|
||||
auto processed_node = isShardedProcessing()
|
||||
? zookeeper_processed_path / toString(getProcessingIdForPath(path))
|
||||
: zookeeper_processed_path;
|
||||
|
||||
LOG_TEST(log, "Setting file `{}` as processed", path);
|
||||
while (true)
|
||||
{
|
||||
std::string res;
|
||||
Coordination::Stat stat;
|
||||
bool exists = zk_client->tryGet(zookeeper_processed_path, res, &stat);
|
||||
bool exists = zk_client->tryGet(processed_node, res, &stat);
|
||||
Coordination::Requests requests;
|
||||
if (exists)
|
||||
{
|
||||
@ -527,11 +678,11 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
|
||||
return;
|
||||
}
|
||||
}
|
||||
requests.push_back(zkutil::makeSetRequest(zookeeper_processed_path, node_metadata, stat.version));
|
||||
requests.push_back(zkutil::makeSetRequest(processed_node, node_metadata, stat.version));
|
||||
}
|
||||
else
|
||||
{
|
||||
requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path, node_metadata, zkutil::CreateMode::Persistent));
|
||||
requests.push_back(zkutil::makeCreateRequest(processed_node, node_metadata, zkutil::CreateMode::Persistent));
|
||||
}
|
||||
|
||||
Coordination::Responses responses;
|
||||
|
@ -80,6 +80,38 @@ public:
|
||||
|
||||
void deactivateCleanupTask();
|
||||
|
||||
/// Should the table use sharded processing?
|
||||
/// We use sharded processing for Ordered mode of S3Queue table.
|
||||
/// It allows to parallelize processing within a single server
|
||||
/// and to allow distributed processing.
|
||||
bool isShardedProcessing() const;
|
||||
|
||||
/// Register a new shard for processing.
|
||||
/// Return a shard id of registered shard.
|
||||
size_t registerNewShard();
|
||||
/// Register a new shard for processing by given id.
|
||||
/// Throws exception if shard by this id is already registered.
|
||||
void registerNewShard(size_t shard_id);
|
||||
/// Unregister shard from keeper.
|
||||
void unregisterShard(size_t shard_id);
|
||||
bool isShardRegistered(size_t shard_id);
|
||||
|
||||
/// Total number of processing ids.
|
||||
/// A processing id identifies a single processing thread.
|
||||
/// There might be several processing ids per shard.
|
||||
size_t getProcessingIdsNum() const;
|
||||
/// Get processing ids identified with requested shard.
|
||||
std::vector<size_t> getProcessingIdsForShard(size_t shard_id) const;
|
||||
/// Check if given processing id belongs to a given shard.
|
||||
bool isProcessingIdBelongsToShard(size_t id, size_t shard_id) const;
|
||||
/// Get a processing id for processing thread by given thread id.
|
||||
/// thread id is a value in range [0, threads_per_shard].
|
||||
size_t getIdForProcessingThread(size_t thread_id, size_t shard_id) const;
|
||||
|
||||
/// Calculate which processing id corresponds to a given file path.
|
||||
/// The file will be processed by a thread related to this processing id.
|
||||
size_t getProcessingIdForPath(const std::string & path) const;
|
||||
|
||||
private:
|
||||
const S3QueueMode mode;
|
||||
const UInt64 max_set_size;
|
||||
@ -87,10 +119,13 @@ private:
|
||||
const UInt64 max_loading_retries;
|
||||
const size_t min_cleanup_interval_ms;
|
||||
const size_t max_cleanup_interval_ms;
|
||||
const size_t shards_num;
|
||||
const size_t threads_per_shard;
|
||||
|
||||
const fs::path zookeeper_processing_path;
|
||||
const fs::path zookeeper_processed_path;
|
||||
const fs::path zookeeper_failed_path;
|
||||
const fs::path zookeeper_shards_path;
|
||||
const fs::path zookeeper_cleanup_lock_path;
|
||||
|
||||
LoggerPtr log;
|
||||
@ -104,6 +139,7 @@ private:
|
||||
|
||||
void setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder);
|
||||
void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder);
|
||||
std::string getZooKeeperPathForShard(size_t shard_id) const;
|
||||
|
||||
enum class SetFileProcessingResult
|
||||
{
|
||||
@ -117,8 +153,7 @@ private:
|
||||
|
||||
struct NodeMetadata
|
||||
{
|
||||
std::string file_path;
|
||||
UInt64 last_processed_timestamp = 0;
|
||||
std::string file_path; UInt64 last_processed_timestamp = 0;
|
||||
std::string last_exception;
|
||||
UInt64 retries = 0;
|
||||
std::string processing_id; /// For ephemeral processing node.
|
||||
|
@ -29,6 +29,8 @@ class ASTStorage;
|
||||
M(UInt32, s3queue_tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
|
||||
M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
|
||||
M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
|
||||
M(UInt32, s3queue_total_shards_num, 1, "Value 0 means disabled", 0) \
|
||||
M(UInt32, s3queue_current_shard_num, 0, "", 0) \
|
||||
|
||||
#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \
|
||||
S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
|
||||
|
@ -28,6 +28,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int S3_ERROR;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
StorageS3QueueSource::S3QueueKeyWithInfo::S3QueueKeyWithInfo(
|
||||
@ -42,33 +43,112 @@ StorageS3QueueSource::S3QueueKeyWithInfo::S3QueueKeyWithInfo(
|
||||
StorageS3QueueSource::FileIterator::FileIterator(
|
||||
std::shared_ptr<S3QueueFilesMetadata> metadata_,
|
||||
std::unique_ptr<GlobIterator> glob_iterator_,
|
||||
size_t current_shard_,
|
||||
std::atomic<bool> & shutdown_called_)
|
||||
: metadata(metadata_)
|
||||
, glob_iterator(std::move(glob_iterator_))
|
||||
, shutdown_called(shutdown_called_)
|
||||
, log(&Poco::Logger::get("StorageS3QueueSource"))
|
||||
, sharded_processing(metadata->isShardedProcessing())
|
||||
, current_shard(current_shard_)
|
||||
{
|
||||
if (sharded_processing)
|
||||
{
|
||||
for (const auto & id : metadata->getProcessingIdsForShard(current_shard))
|
||||
sharded_keys.emplace(id, std::deque<KeyWithInfoPtr>{});
|
||||
}
|
||||
}
|
||||
|
||||
StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next()
|
||||
StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(size_t idx)
|
||||
{
|
||||
while (!shutdown_called)
|
||||
{
|
||||
KeyWithInfoPtr val = glob_iterator->next();
|
||||
KeyWithInfoPtr val{nullptr};
|
||||
|
||||
{
|
||||
std::unique_lock lk(sharded_keys_mutex, std::defer_lock);
|
||||
if (sharded_processing)
|
||||
{
|
||||
/// To make sure order on keys in each shard in sharded_keys
|
||||
/// we need to check sharded_keys and to next() under lock.
|
||||
lk.lock();
|
||||
|
||||
if (auto it = sharded_keys.find(idx); it != sharded_keys.end())
|
||||
{
|
||||
auto & keys = it->second;
|
||||
if (!keys.empty())
|
||||
{
|
||||
val = keys.front();
|
||||
keys.pop_front();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Processing id {} does not exist (Expected ids: {})",
|
||||
idx, fmt::join(metadata->getProcessingIdsForShard(current_shard), ", "));
|
||||
}
|
||||
}
|
||||
|
||||
if (!val)
|
||||
{
|
||||
val = glob_iterator->next();
|
||||
if (val && sharded_processing)
|
||||
{
|
||||
const auto processing_id_for_key = metadata->getProcessingIdForPath(val->key);
|
||||
if (idx != processing_id_for_key)
|
||||
{
|
||||
if (metadata->isProcessingIdBelongsToShard(processing_id_for_key, current_shard))
|
||||
{
|
||||
LOG_TEST(log, "Putting key {} into queue of processor {} (total: {})",
|
||||
val->key, processing_id_for_key, sharded_keys.size());
|
||||
|
||||
if (auto it = sharded_keys.find(idx); it != sharded_keys.end())
|
||||
{
|
||||
it->second.push_back(val);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Processing id {} does not exist (Expected ids: {})",
|
||||
idx, fmt::join(metadata->getProcessingIdsForShard(current_shard), ", "));
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!val)
|
||||
return {};
|
||||
|
||||
if (shutdown_called)
|
||||
{
|
||||
LOG_TEST(getLogger("StorageS3QueueSource"), "Shutdown was called, stopping file iterator");
|
||||
LOG_TEST(log, "Shutdown was called, stopping file iterator");
|
||||
return {};
|
||||
}
|
||||
|
||||
if (auto processing_holder = metadata->trySetFileAsProcessing(val->key);
|
||||
processing_holder && !shutdown_called)
|
||||
auto processing_holder = metadata->trySetFileAsProcessing(val->key);
|
||||
if (shutdown_called)
|
||||
{
|
||||
LOG_TEST(log, "Shutdown was called, stopping file iterator");
|
||||
return {};
|
||||
}
|
||||
|
||||
LOG_TEST(log, "Checking if can process key {} for processing_id {}", val->key, idx);
|
||||
|
||||
if (processing_holder)
|
||||
{
|
||||
return std::make_shared<S3QueueKeyWithInfo>(val->key, val->info, processing_holder);
|
||||
}
|
||||
else if (sharded_processing
|
||||
&& metadata->getFileStatus(val->key)->state == S3QueueFilesMetadata::FileStatus::State::Processing)
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"File {} is processing by someone else in sharded processing. "
|
||||
"It is a bug", val->key);
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
@ -83,6 +163,7 @@ StorageS3QueueSource::StorageS3QueueSource(
|
||||
const Block & header_,
|
||||
std::unique_ptr<StorageS3Source> internal_source_,
|
||||
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
|
||||
size_t processing_id_,
|
||||
const S3QueueAction & action_,
|
||||
RemoveFileFunc remove_file_func_,
|
||||
const NamesAndTypesList & requested_virtual_columns_,
|
||||
@ -96,6 +177,7 @@ StorageS3QueueSource::StorageS3QueueSource(
|
||||
, WithContext(context_)
|
||||
, name(std::move(name_))
|
||||
, action(action_)
|
||||
, processing_id(processing_id_)
|
||||
, files_metadata(files_metadata_)
|
||||
, internal_source(std::move(internal_source_))
|
||||
, requested_virtual_columns(requested_virtual_columns_)
|
||||
@ -123,7 +205,7 @@ void StorageS3QueueSource::lazyInitialize()
|
||||
if (initialized)
|
||||
return;
|
||||
|
||||
internal_source->lazyInitialize();
|
||||
internal_source->lazyInitialize(processing_id);
|
||||
reader = std::move(internal_source->reader);
|
||||
if (reader)
|
||||
reader_future = std::move(internal_source->reader_future);
|
||||
@ -249,7 +331,7 @@ Chunk StorageS3QueueSource::generate()
|
||||
/// Even if task is finished the thread may be not freed in pool.
|
||||
/// So wait until it will be freed before scheduling a new task.
|
||||
internal_source->create_reader_pool.wait();
|
||||
reader_future = internal_source->createReaderAsync();
|
||||
reader_future = internal_source->createReaderAsync(processing_id);
|
||||
}
|
||||
|
||||
return {};
|
||||
|
@ -38,12 +38,16 @@ public:
|
||||
class FileIterator : public IIterator
|
||||
{
|
||||
public:
|
||||
FileIterator(std::shared_ptr<S3QueueFilesMetadata> metadata_, std::unique_ptr<GlobIterator> glob_iterator_, std::atomic<bool> & shutdown_called_);
|
||||
FileIterator(
|
||||
std::shared_ptr<S3QueueFilesMetadata> metadata_,
|
||||
std::unique_ptr<GlobIterator> glob_iterator_,
|
||||
size_t current_shard_,
|
||||
std::atomic<bool> & shutdown_called_);
|
||||
|
||||
/// Note:
|
||||
/// List results in s3 are always returned in UTF-8 binary order.
|
||||
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
|
||||
KeyWithInfoPtr next() override;
|
||||
KeyWithInfoPtr next(size_t idx) override;
|
||||
|
||||
size_t estimatedKeysCount() override;
|
||||
|
||||
@ -52,6 +56,12 @@ public:
|
||||
const std::unique_ptr<GlobIterator> glob_iterator;
|
||||
std::atomic<bool> & shutdown_called;
|
||||
std::mutex mutex;
|
||||
Poco::Logger * log;
|
||||
|
||||
const bool sharded_processing;
|
||||
const size_t current_shard;
|
||||
std::unordered_map<size_t, std::deque<KeyWithInfoPtr>> sharded_keys;
|
||||
std::mutex sharded_keys_mutex;
|
||||
};
|
||||
|
||||
StorageS3QueueSource(
|
||||
@ -59,6 +69,7 @@ public:
|
||||
const Block & header_,
|
||||
std::unique_ptr<StorageS3Source> internal_source_,
|
||||
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
|
||||
size_t processing_id_,
|
||||
const S3QueueAction & action_,
|
||||
RemoveFileFunc remove_file_func_,
|
||||
const NamesAndTypesList & requested_virtual_columns_,
|
||||
@ -80,6 +91,7 @@ public:
|
||||
private:
|
||||
const String name;
|
||||
const S3QueueAction action;
|
||||
const size_t processing_id;
|
||||
const std::shared_ptr<S3QueueFilesMetadata> files_metadata;
|
||||
const std::shared_ptr<StorageS3Source> internal_source;
|
||||
const NamesAndTypesList requested_virtual_columns;
|
||||
|
@ -16,8 +16,22 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int METADATA_MISMATCH;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
S3QueueMode modeFromString(const std::string & mode)
|
||||
{
|
||||
if (mode == "ordered")
|
||||
return S3QueueMode::ORDERED;
|
||||
if (mode == "unordered")
|
||||
return S3QueueMode::UNORDERED;
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected S3Queue mode: {}", mode);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
S3QueueTableMetadata::S3QueueTableMetadata(
|
||||
const StorageS3::Configuration & configuration,
|
||||
const S3QueueSettings & engine_settings,
|
||||
@ -28,10 +42,11 @@ S3QueueTableMetadata::S3QueueTableMetadata(
|
||||
mode = engine_settings.mode.toString();
|
||||
s3queue_tracked_files_limit = engine_settings.s3queue_tracked_files_limit;
|
||||
s3queue_tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec;
|
||||
s3queue_total_shards_num = engine_settings.s3queue_total_shards_num;
|
||||
s3queue_processing_threads_num = engine_settings.s3queue_processing_threads_num;
|
||||
columns = storage_metadata.getColumns().toString();
|
||||
}
|
||||
|
||||
|
||||
String S3QueueTableMetadata::toString() const
|
||||
{
|
||||
Poco::JSON::Object json;
|
||||
@ -39,6 +54,8 @@ String S3QueueTableMetadata::toString() const
|
||||
json.set("mode", mode);
|
||||
json.set("s3queue_tracked_files_limit", s3queue_tracked_files_limit);
|
||||
json.set("s3queue_tracked_file_ttl_sec", s3queue_tracked_file_ttl_sec);
|
||||
json.set("s3queue_total_shards_num", s3queue_total_shards_num);
|
||||
json.set("s3queue_processing_threads_num", s3queue_processing_threads_num);
|
||||
json.set("format_name", format_name);
|
||||
json.set("columns", columns);
|
||||
|
||||
@ -58,6 +75,10 @@ void S3QueueTableMetadata::read(const String & metadata_str)
|
||||
s3queue_tracked_file_ttl_sec = json->getValue<UInt64>("s3queue_tracked_file_ttl_sec");
|
||||
format_name = json->getValue<String>("format_name");
|
||||
columns = json->getValue<String>("columns");
|
||||
if (json->has("s3queue_total_shards_num"))
|
||||
s3queue_total_shards_num = json->getValue<UInt64>("s3queue_total_shards_num");
|
||||
if (json->has("s3queue_processing_threads_num"))
|
||||
s3queue_processing_threads_num = json->getValue<UInt64>("s3queue_processing_threads_num");
|
||||
}
|
||||
|
||||
S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)
|
||||
@ -67,7 +88,6 @@ S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)
|
||||
return metadata;
|
||||
}
|
||||
|
||||
|
||||
void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const
|
||||
{
|
||||
if (after_processing != from_zk.after_processing)
|
||||
@ -83,8 +103,8 @@ void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata
|
||||
ErrorCodes::METADATA_MISMATCH,
|
||||
"Existing table metadata in ZooKeeper differs in engine mode. "
|
||||
"Stored in ZooKeeper: {}, local: {}",
|
||||
DB::toString(from_zk.mode),
|
||||
DB::toString(mode));
|
||||
from_zk.mode,
|
||||
mode);
|
||||
|
||||
if (s3queue_tracked_files_limit != from_zk.s3queue_tracked_files_limit)
|
||||
throw Exception(
|
||||
@ -109,6 +129,28 @@ void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata
|
||||
"Stored in ZooKeeper: {}, local: {}",
|
||||
from_zk.format_name,
|
||||
format_name);
|
||||
|
||||
if (modeFromString(mode) == S3QueueMode::ORDERED)
|
||||
{
|
||||
if (s3queue_processing_threads_num != from_zk.s3queue_processing_threads_num)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::METADATA_MISMATCH,
|
||||
"Existing table metadata in ZooKeeper differs in s3queue_processing_threads_num setting. "
|
||||
"Stored in ZooKeeper: {}, local: {}",
|
||||
from_zk.s3queue_processing_threads_num,
|
||||
s3queue_processing_threads_num);
|
||||
}
|
||||
if (s3queue_total_shards_num != from_zk.s3queue_total_shards_num)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::METADATA_MISMATCH,
|
||||
"Existing table metadata in ZooKeeper differs in s3queue_total_shards_num setting. "
|
||||
"Stored in ZooKeeper: {}, local: {}",
|
||||
from_zk.s3queue_total_shards_num,
|
||||
s3queue_total_shards_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void S3QueueTableMetadata::checkEquals(const S3QueueTableMetadata & from_zk) const
|
||||
|
@ -23,6 +23,8 @@ struct S3QueueTableMetadata
|
||||
String mode;
|
||||
UInt64 s3queue_tracked_files_limit;
|
||||
UInt64 s3queue_tracked_file_ttl_sec;
|
||||
UInt64 s3queue_total_shards_num;
|
||||
UInt64 s3queue_processing_threads_num;
|
||||
|
||||
S3QueueTableMetadata() = default;
|
||||
S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings, const StorageInMemoryMetadata & storage_metadata);
|
||||
|
@ -75,14 +75,8 @@ namespace
|
||||
return zkutil::extractZooKeeperPath(result_zk_path, true);
|
||||
}
|
||||
|
||||
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, LoggerPtr log)
|
||||
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings)
|
||||
{
|
||||
if (s3queue_settings.mode == S3QueueMode::ORDERED && s3queue_settings.s3queue_processing_threads_num > 1)
|
||||
{
|
||||
LOG_WARNING(log, "Parallel processing is not yet supported for Ordered mode");
|
||||
s3queue_settings.s3queue_processing_threads_num = 1;
|
||||
}
|
||||
|
||||
if (!s3queue_settings.s3queue_processing_threads_num)
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
|
||||
@ -110,7 +104,8 @@ StorageS3Queue::StorageS3Queue(
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
ASTStorage * engine_args)
|
||||
: IStorage(table_id_)
|
||||
, WithContext(context_)
|
||||
, s3queue_settings(std::move(s3queue_settings_))
|
||||
@ -134,7 +129,7 @@ StorageS3Queue::StorageS3Queue(
|
||||
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
|
||||
}
|
||||
|
||||
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), log);
|
||||
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef());
|
||||
|
||||
configuration.update(context_);
|
||||
FormatFactory::instance().checkFormatName(configuration.format);
|
||||
@ -173,6 +168,19 @@ StorageS3Queue::StorageS3Queue(
|
||||
S3QueueMetadataFactory::instance().remove(zk_path);
|
||||
throw;
|
||||
}
|
||||
|
||||
if (files_metadata->isShardedProcessing())
|
||||
{
|
||||
if (!s3queue_settings->s3queue_current_shard_num.changed)
|
||||
{
|
||||
s3queue_settings->s3queue_current_shard_num = static_cast<UInt32>(files_metadata->registerNewShard());
|
||||
engine_args->settings->changes.setSetting("s3queue_current_shard_num", s3queue_settings->s3queue_current_shard_num.value);
|
||||
}
|
||||
else if (!files_metadata->isShardRegistered(s3queue_settings->s3queue_current_shard_num))
|
||||
{
|
||||
files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void StorageS3Queue::startup()
|
||||
@ -186,6 +194,7 @@ void StorageS3Queue::shutdown(bool is_drop)
|
||||
table_is_being_dropped = is_drop;
|
||||
shutdown_called = true;
|
||||
|
||||
LOG_TRACE(log, "Shutting down storage...");
|
||||
if (task)
|
||||
{
|
||||
task->deactivate();
|
||||
@ -194,8 +203,16 @@ void StorageS3Queue::shutdown(bool is_drop)
|
||||
if (files_metadata)
|
||||
{
|
||||
files_metadata->deactivateCleanupTask();
|
||||
|
||||
if (is_drop && files_metadata->isShardedProcessing())
|
||||
{
|
||||
files_metadata->unregisterShard(s3queue_settings->s3queue_current_shard_num);
|
||||
LOG_TRACE(log, "Unregistered shard {} from zookeeper", s3queue_settings->s3queue_current_shard_num);
|
||||
}
|
||||
|
||||
files_metadata.reset();
|
||||
}
|
||||
LOG_TRACE(log, "Shut down storage");
|
||||
}
|
||||
|
||||
void StorageS3Queue::drop()
|
||||
@ -220,14 +237,12 @@ public:
|
||||
ReadFromFormatInfo info_,
|
||||
std::shared_ptr<StorageS3Queue> storage_,
|
||||
ContextPtr context_,
|
||||
size_t max_block_size_,
|
||||
size_t num_streams_)
|
||||
size_t max_block_size_)
|
||||
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
|
||||
, info(std::move(info_))
|
||||
, storage(std::move(storage_))
|
||||
, context(std::move(context_))
|
||||
, max_block_size(max_block_size_)
|
||||
, num_streams(num_streams_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -236,7 +251,6 @@ private:
|
||||
std::shared_ptr<StorageS3Queue> storage;
|
||||
ContextPtr context;
|
||||
size_t max_block_size;
|
||||
size_t num_streams;
|
||||
|
||||
std::shared_ptr<StorageS3Queue::FileIterator> iterator;
|
||||
|
||||
@ -270,7 +284,7 @@ void StorageS3Queue::read(
|
||||
ContextPtr local_context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
size_t num_streams)
|
||||
size_t)
|
||||
{
|
||||
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
|
||||
{
|
||||
@ -292,8 +306,7 @@ void StorageS3Queue::read(
|
||||
read_from_format_info,
|
||||
std::move(this_ptr),
|
||||
local_context,
|
||||
max_block_size,
|
||||
num_streams);
|
||||
max_block_size);
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
@ -301,11 +314,15 @@ void StorageS3Queue::read(
|
||||
void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
Pipes pipes;
|
||||
const size_t adjusted_num_streams = std::min<size_t>(num_streams, storage->s3queue_settings->s3queue_processing_threads_num);
|
||||
const size_t adjusted_num_streams = storage->s3queue_settings->s3queue_processing_threads_num;
|
||||
|
||||
createIterator(nullptr);
|
||||
for (size_t i = 0; i < adjusted_num_streams; ++i)
|
||||
pipes.emplace_back(storage->createSource(info, iterator, max_block_size, context));
|
||||
pipes.emplace_back(storage->createSource(
|
||||
info,
|
||||
iterator,
|
||||
storage->files_metadata->getIdForProcessingThread(i, storage->s3queue_settings->s3queue_current_shard_num),
|
||||
max_block_size, context));
|
||||
|
||||
auto pipe = Pipe::unitePipes(std::move(pipes));
|
||||
if (pipe.empty())
|
||||
@ -320,6 +337,7 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const
|
||||
std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
|
||||
const ReadFromFormatInfo & info,
|
||||
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
|
||||
size_t processing_id,
|
||||
size_t max_block_size,
|
||||
ContextPtr local_context)
|
||||
{
|
||||
@ -359,7 +377,7 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
|
||||
auto s3_queue_log = s3queue_settings->s3queue_enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr;
|
||||
return std::make_shared<StorageS3QueueSource>(
|
||||
getName(), info.source_header, std::move(internal_source),
|
||||
files_metadata, after_processing, file_deleter, info.requested_virtual_columns,
|
||||
files_metadata, processing_id, after_processing, file_deleter, info.requested_virtual_columns,
|
||||
local_context, shutdown_called, table_is_being_dropped, s3_queue_log, getStorageID(), log);
|
||||
}
|
||||
|
||||
@ -463,7 +481,8 @@ bool StorageS3Queue::streamToViews()
|
||||
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
|
||||
{
|
||||
auto source = createSource(
|
||||
read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
|
||||
read_from_format_info, file_iterator, files_metadata->getIdForProcessingThread(i, s3queue_settings->s3queue_current_shard_num),
|
||||
DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
|
||||
|
||||
pipes.emplace_back(std::move(source));
|
||||
}
|
||||
@ -566,7 +585,7 @@ std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator
|
||||
auto glob_iterator = std::make_unique<StorageS3QueueSource::GlobIterator>(
|
||||
*configuration.client, configuration.url, predicate, virtual_columns, local_context,
|
||||
/* read_keys */nullptr, configuration.request_settings);
|
||||
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator), shutdown_called);
|
||||
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called);
|
||||
}
|
||||
|
||||
void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
|
||||
@ -624,7 +643,8 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
|
||||
args.constraints,
|
||||
args.comment,
|
||||
args.getContext(),
|
||||
format_settings);
|
||||
format_settings,
|
||||
args.storage_def);
|
||||
},
|
||||
{
|
||||
.supports_settings = true,
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <Storages/StorageS3.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <IO/S3/BlobStorageLogWriter.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
|
||||
|
||||
namespace Aws::S3
|
||||
@ -35,7 +36,8 @@ public:
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_);
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
ASTStorage * engine_args);
|
||||
|
||||
String getName() const override { return "S3Queue"; }
|
||||
|
||||
@ -91,6 +93,7 @@ private:
|
||||
std::shared_ptr<StorageS3QueueSource> createSource(
|
||||
const ReadFromFormatInfo & info,
|
||||
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
|
||||
size_t processing_id,
|
||||
size_t max_block_size,
|
||||
ContextPtr local_context);
|
||||
|
||||
|
@ -244,7 +244,7 @@ public:
|
||||
fillInternalBufferAssumeLocked();
|
||||
}
|
||||
|
||||
KeyWithInfoPtr next()
|
||||
KeyWithInfoPtr next(size_t)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return nextAssumeLocked();
|
||||
@ -436,9 +436,9 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
|
||||
{
|
||||
}
|
||||
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next()
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next(size_t idx) /// NOLINT
|
||||
{
|
||||
return pimpl->next();
|
||||
return pimpl->next(idx);
|
||||
}
|
||||
|
||||
size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount()
|
||||
@ -471,7 +471,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
KeyWithInfoPtr next()
|
||||
KeyWithInfoPtr next(size_t)
|
||||
{
|
||||
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
|
||||
if (current_index >= keys.size())
|
||||
@ -516,9 +516,9 @@ StorageS3Source::KeysIterator::KeysIterator(
|
||||
{
|
||||
}
|
||||
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next()
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next(size_t idx) /// NOLINT
|
||||
{
|
||||
return pimpl->next();
|
||||
return pimpl->next(idx);
|
||||
}
|
||||
|
||||
size_t StorageS3Source::KeysIterator::estimatedKeysCount()
|
||||
@ -545,7 +545,7 @@ StorageS3Source::ReadTaskIterator::ReadTaskIterator(
|
||||
buffer.emplace_back(std::make_shared<KeyWithInfo>(key_future.get(), std::nullopt));
|
||||
}
|
||||
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next()
|
||||
StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next(size_t) /// NOLINT
|
||||
{
|
||||
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
|
||||
if (current_index >= buffer.size())
|
||||
@ -599,23 +599,23 @@ StorageS3Source::StorageS3Source(
|
||||
{
|
||||
}
|
||||
|
||||
void StorageS3Source::lazyInitialize()
|
||||
void StorageS3Source::lazyInitialize(size_t idx)
|
||||
{
|
||||
if (initialized)
|
||||
return;
|
||||
|
||||
reader = createReader();
|
||||
reader = createReader(idx);
|
||||
if (reader)
|
||||
reader_future = createReaderAsync();
|
||||
reader_future = createReaderAsync(idx);
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
StorageS3Source::ReaderHolder StorageS3Source::createReader()
|
||||
StorageS3Source::ReaderHolder StorageS3Source::createReader(size_t idx)
|
||||
{
|
||||
KeyWithInfoPtr key_with_info;
|
||||
do
|
||||
{
|
||||
key_with_info = (*file_iterator)();
|
||||
key_with_info = file_iterator->next(idx);
|
||||
if (!key_with_info || key_with_info->key.empty())
|
||||
return {};
|
||||
|
||||
@ -689,9 +689,9 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
|
||||
return ReaderHolder{key_with_info, bucket, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)};
|
||||
}
|
||||
|
||||
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()
|
||||
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync(size_t idx)
|
||||
{
|
||||
return create_reader_scheduler([this] { return createReader(); }, Priority{});
|
||||
return create_reader_scheduler([=, this] { return createReader(idx); }, Priority{});
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key, size_t object_size)
|
||||
|
@ -61,7 +61,7 @@ public:
|
||||
{
|
||||
public:
|
||||
virtual ~IIterator() = default;
|
||||
virtual KeyWithInfoPtr next() = 0;
|
||||
virtual KeyWithInfoPtr next(size_t idx = 0) = 0; /// NOLINT
|
||||
|
||||
/// Estimates how many streams we need to process all files.
|
||||
/// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys.
|
||||
@ -85,7 +85,7 @@ public:
|
||||
const S3Settings::RequestSettings & request_settings_ = {},
|
||||
std::function<void(FileProgress)> progress_callback_ = {});
|
||||
|
||||
KeyWithInfoPtr next() override;
|
||||
KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT
|
||||
size_t estimatedKeysCount() override;
|
||||
|
||||
private:
|
||||
@ -106,7 +106,7 @@ public:
|
||||
KeysWithInfo * read_keys = nullptr,
|
||||
std::function<void(FileProgress)> progress_callback_ = {});
|
||||
|
||||
KeyWithInfoPtr next() override;
|
||||
KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT
|
||||
size_t estimatedKeysCount() override;
|
||||
|
||||
private:
|
||||
@ -120,7 +120,7 @@ public:
|
||||
public:
|
||||
explicit ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count);
|
||||
|
||||
KeyWithInfoPtr next() override;
|
||||
KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT
|
||||
size_t estimatedKeysCount() override;
|
||||
|
||||
private:
|
||||
@ -253,11 +253,11 @@ private:
|
||||
|
||||
/// Notice: we should initialize reader and future_reader lazily in generate to make sure key_condition
|
||||
/// is set before createReader is invoked for key_condition is read in createReader.
|
||||
void lazyInitialize();
|
||||
void lazyInitialize(size_t idx = 0);
|
||||
|
||||
/// Recreate ReadBuffer and Pipeline for each file.
|
||||
ReaderHolder createReader();
|
||||
std::future<ReaderHolder> createReaderAsync();
|
||||
ReaderHolder createReader(size_t idx = 0);
|
||||
std::future<ReaderHolder> createReaderAsync(size_t idx = 0);
|
||||
|
||||
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key, size_t object_size);
|
||||
std::unique_ptr<ReadBuffer> createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size);
|
||||
|
@ -89,6 +89,7 @@ def started_cluster():
|
||||
"configs/zookeeper.xml",
|
||||
"configs/s3queue_log.xml",
|
||||
],
|
||||
stay_alive=True,
|
||||
)
|
||||
cluster.add_instance(
|
||||
"instance2",
|
||||
@ -165,6 +166,7 @@ def create_table(
|
||||
file_format="CSV",
|
||||
auth=DEFAULT_AUTH,
|
||||
bucket=None,
|
||||
expect_error=False,
|
||||
):
|
||||
auth_params = ",".join(auth)
|
||||
bucket = started_cluster.minio_bucket if bucket is None else bucket
|
||||
@ -184,6 +186,10 @@ def create_table(
|
||||
ENGINE = S3Queue('{url}', {auth_params}, {file_format})
|
||||
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
|
||||
"""
|
||||
|
||||
if expect_error:
|
||||
return node.query_and_get_error(create_query)
|
||||
|
||||
node.query(create_query)
|
||||
|
||||
|
||||
@ -960,3 +966,320 @@ def test_s3_client_reused(started_cluster):
|
||||
s3_clients_after = get_created_s3_clients_count()
|
||||
|
||||
assert s3_clients_before == s3_clients_after
|
||||
|
||||
|
||||
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
|
||||
def test_processing_threads(started_cluster, mode):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"processing_threads_{mode}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 300
|
||||
processing_threads = 32
|
||||
|
||||
create_table(
|
||||
started_cluster,
|
||||
node,
|
||||
table_name,
|
||||
mode,
|
||||
files_path,
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": processing_threads,
|
||||
},
|
||||
)
|
||||
create_mv(node, table_name, dst_table_name)
|
||||
|
||||
total_values = generate_random_files(
|
||||
started_cluster, files_path, files_to_generate, row_num=1
|
||||
)
|
||||
|
||||
def get_count(table_name):
|
||||
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
||||
|
||||
for _ in range(100):
|
||||
if (get_count(f"{dst_table_name}")) == files_to_generate:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
assert get_count(dst_table_name) == files_to_generate
|
||||
|
||||
res = [
|
||||
list(map(int, l.split()))
|
||||
for l in node.query(
|
||||
f"SELECT column1, column2, column3 FROM {dst_table_name}"
|
||||
).splitlines()
|
||||
]
|
||||
assert {tuple(v) for v in res} == set([tuple(i) for i in total_values])
|
||||
|
||||
if mode == "ordered":
|
||||
zk = started_cluster.get_kazoo_client("zoo1")
|
||||
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
|
||||
assert len(processed_nodes) == processing_threads
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"mode, processing_threads",
|
||||
[
|
||||
pytest.param("unordered", 1),
|
||||
pytest.param("unordered", 8),
|
||||
pytest.param("ordered", 1),
|
||||
pytest.param("ordered", 8),
|
||||
],
|
||||
)
|
||||
def test_shards(started_cluster, mode, processing_threads):
|
||||
node = started_cluster.instances["instance"]
|
||||
table_name = f"test_shards_{mode}_{processing_threads}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 300
|
||||
shards_num = 3
|
||||
|
||||
for i in range(shards_num):
|
||||
table = f"{table_name}_{i + 1}"
|
||||
dst_table = f"{dst_table_name}_{i + 1}"
|
||||
create_table(
|
||||
started_cluster,
|
||||
node,
|
||||
table,
|
||||
mode,
|
||||
files_path,
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": processing_threads,
|
||||
"s3queue_total_shards_num": shards_num,
|
||||
},
|
||||
)
|
||||
create_mv(node, table, dst_table)
|
||||
|
||||
total_values = generate_random_files(
|
||||
started_cluster, files_path, files_to_generate, row_num=1
|
||||
)
|
||||
|
||||
def get_count(table_name):
|
||||
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
||||
|
||||
for _ in range(100):
|
||||
if (
|
||||
get_count(f"{dst_table_name}_1")
|
||||
+ get_count(f"{dst_table_name}_2")
|
||||
+ get_count(f"{dst_table_name}_3")
|
||||
) == files_to_generate:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
if (
|
||||
get_count(f"{dst_table_name}_1")
|
||||
+ get_count(f"{dst_table_name}_2")
|
||||
+ get_count(f"{dst_table_name}_3")
|
||||
) != files_to_generate:
|
||||
info = node.query(
|
||||
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
|
||||
)
|
||||
logging.debug(info)
|
||||
assert False
|
||||
|
||||
res1 = [
|
||||
list(map(int, l.split()))
|
||||
for l in node.query(
|
||||
f"SELECT column1, column2, column3 FROM {dst_table_name}_1"
|
||||
).splitlines()
|
||||
]
|
||||
res2 = [
|
||||
list(map(int, l.split()))
|
||||
for l in node.query(
|
||||
f"SELECT column1, column2, column3 FROM {dst_table_name}_2"
|
||||
).splitlines()
|
||||
]
|
||||
res3 = [
|
||||
list(map(int, l.split()))
|
||||
for l in node.query(
|
||||
f"SELECT column1, column2, column3 FROM {dst_table_name}_3"
|
||||
).splitlines()
|
||||
]
|
||||
assert {tuple(v) for v in res1 + res2 + res3} == set(
|
||||
[tuple(i) for i in total_values]
|
||||
)
|
||||
|
||||
# Checking that all files were processed only once
|
||||
time.sleep(10)
|
||||
assert (
|
||||
get_count(f"{dst_table_name}_1")
|
||||
+ get_count(f"{dst_table_name}_2")
|
||||
+ get_count(f"{dst_table_name}_3")
|
||||
) == files_to_generate
|
||||
|
||||
if mode == "ordered":
|
||||
zk = started_cluster.get_kazoo_client("zoo1")
|
||||
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
|
||||
assert len(processed_nodes) == shards_num * processing_threads
|
||||
shard_nodes = zk.get_children(f"{keeper_path}/shards/")
|
||||
assert len(shard_nodes) == shards_num
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"mode, processing_threads",
|
||||
[
|
||||
pytest.param("unordered", 1),
|
||||
pytest.param("unordered", 8),
|
||||
pytest.param("ordered", 1),
|
||||
pytest.param("ordered", 8),
|
||||
],
|
||||
)
|
||||
def test_shards_distributed(started_cluster, mode, processing_threads):
|
||||
node = started_cluster.instances["instance"]
|
||||
node_2 = started_cluster.instances["instance2"]
|
||||
table_name = f"test_shards_distributed_{mode}_{processing_threads}"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
files_path = f"{table_name}_data"
|
||||
files_to_generate = 300
|
||||
row_num = 50
|
||||
total_rows = row_num * files_to_generate
|
||||
shards_num = 2
|
||||
|
||||
i = 0
|
||||
for instance in [node, node_2]:
|
||||
create_table(
|
||||
started_cluster,
|
||||
instance,
|
||||
table_name,
|
||||
mode,
|
||||
files_path,
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": processing_threads,
|
||||
"s3queue_total_shards_num": shards_num,
|
||||
},
|
||||
)
|
||||
i += 1
|
||||
|
||||
for instance in [node, node_2]:
|
||||
create_mv(instance, table_name, dst_table_name)
|
||||
|
||||
total_values = generate_random_files(
|
||||
started_cluster, files_path, files_to_generate, row_num=row_num
|
||||
)
|
||||
|
||||
def get_count(node, table_name):
|
||||
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
||||
|
||||
for _ in range(150):
|
||||
if (
|
||||
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
||||
) == total_rows:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
if (
|
||||
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
||||
) != total_rows:
|
||||
info = node.query(
|
||||
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
|
||||
)
|
||||
logging.debug(info)
|
||||
assert False
|
||||
|
||||
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
|
||||
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
|
||||
res2 = [
|
||||
list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines()
|
||||
]
|
||||
|
||||
assert len(res1) + len(res2) == total_rows
|
||||
|
||||
# Checking that all engines have made progress
|
||||
assert len(res1) > 0
|
||||
assert len(res2) > 0
|
||||
|
||||
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
|
||||
|
||||
# Checking that all files were processed only once
|
||||
time.sleep(10)
|
||||
assert (
|
||||
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
||||
) == total_rows
|
||||
|
||||
if mode == "ordered":
|
||||
zk = started_cluster.get_kazoo_client("zoo1")
|
||||
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
|
||||
assert len(processed_nodes) == shards_num * processing_threads
|
||||
shard_nodes = zk.get_children(f"{keeper_path}/shards/")
|
||||
assert len(shard_nodes) == shards_num
|
||||
|
||||
node.restart_clickhouse()
|
||||
time.sleep(10)
|
||||
assert (
|
||||
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
||||
) == total_rows
|
||||
|
||||
|
||||
def test_settings_check(started_cluster):
|
||||
node = started_cluster.instances["instance"]
|
||||
node_2 = started_cluster.instances["instance2"]
|
||||
table_name = f"test_settings_check"
|
||||
dst_table_name = f"{table_name}_dst"
|
||||
keeper_path = f"/clickhouse/test_{table_name}"
|
||||
files_path = f"{table_name}_data"
|
||||
mode = "ordered"
|
||||
|
||||
create_table(
|
||||
started_cluster,
|
||||
node,
|
||||
table_name,
|
||||
mode,
|
||||
files_path,
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": 5,
|
||||
"s3queue_total_shards_num": 2,
|
||||
},
|
||||
)
|
||||
|
||||
assert (
|
||||
"Existing table metadata in ZooKeeper differs in s3queue_total_shards_num setting. Stored in ZooKeeper: 2, local: 3"
|
||||
in create_table(
|
||||
started_cluster,
|
||||
node_2,
|
||||
table_name,
|
||||
mode,
|
||||
files_path,
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": 5,
|
||||
"s3queue_total_shards_num": 3,
|
||||
},
|
||||
expect_error=True,
|
||||
)
|
||||
)
|
||||
|
||||
assert (
|
||||
"Existing table metadata in ZooKeeper differs in s3queue_processing_threads_num setting. Stored in ZooKeeper: 5, local: 2"
|
||||
in create_table(
|
||||
started_cluster,
|
||||
node_2,
|
||||
table_name,
|
||||
mode,
|
||||
files_path,
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": 2,
|
||||
"s3queue_total_shards_num": 2,
|
||||
},
|
||||
expect_error=True,
|
||||
)
|
||||
)
|
||||
|
||||
assert "s3queue_current_shard_num = 0" in node.query(
|
||||
f"SHOW CREATE TABLE {table_name}"
|
||||
)
|
||||
|
||||
node.restart_clickhouse()
|
||||
|
||||
assert "s3queue_current_shard_num = 0" in node.query(
|
||||
f"SHOW CREATE TABLE {table_name}"
|
||||
)
|
||||
|
||||
node.query(f"DROP TABLE {table_name} SYNC")
|
||||
|
Loading…
Reference in New Issue
Block a user