Parallel & disrtibuted processing for ordered mode

This commit is contained in:
kssenii 2024-01-24 16:04:00 +01:00
parent 4292ee7d51
commit 7f8f379d7f
10 changed files with 414 additions and 58 deletions

View File

@ -129,6 +129,8 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, con
, max_loading_retries(settings_.s3queue_loading_retries.value)
, min_cleanup_interval_ms(settings_.s3queue_cleanup_interval_min_ms.value)
, max_cleanup_interval_ms(settings_.s3queue_cleanup_interval_max_ms.value)
, shards_num(settings_.s3queue_total_shards_num)
, threads_per_shard(settings_.s3queue_processing_threads_num)
, zookeeper_processing_path(zookeeper_path_ / "processing")
, zookeeper_processed_path(zookeeper_path_ / "processed")
, zookeeper_failed_path(zookeeper_path_ / "failed")
@ -197,6 +199,11 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata(
return metadata;
}
size_t S3QueueFilesMetadata::getProcessingThreadForPath(const std::string & path) const
{
return sipHash64(path.data(), path.size()) % getProcessingThreadsNum();
}
S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds);
@ -312,7 +319,8 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs
}
std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status)
S3QueueFilesMetadata::ProcessingNodeHolderPtr>
S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status)
{
/// In one zookeeper transaction do the following:
/// 1. check that corresponding persistent nodes do not exist in processed/ and failed/;
@ -339,7 +347,8 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
if (code == Coordination::Error::ZOK)
{
auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
auto holder = std::make_unique<ProcessingNodeHolder>(
node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
}
@ -362,7 +371,8 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
}
std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status)
S3QueueFilesMetadata::ProcessingNodeHolderPtr>
S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status)
{
/// Same as for Unordered mode.
/// The only difference is the check if the file is already processed.
@ -385,10 +395,15 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
/// If the version did change - retry (since we cannot do Get and Create requests
/// in the same zookeeper transaction, so we use a while loop with tries).
Coordination::Stat processed_node_stat;
auto data = zk_client->get(zookeeper_processed_path, &processed_node_stat);
auto processed_node = isShardedProcessing()
? zookeeper_processed_path / toString(getProcessingThreadForPath(path))
: zookeeper_processed_path;
NodeMetadata processed_node_metadata;
if (!data.empty())
Coordination::Stat processed_node_stat;
std::string data;
auto processed_node_exists = zk_client->tryGet(processed_node, data, &processed_node_stat);
if (processed_node_exists && !data.empty())
processed_node_metadata = NodeMetadata::fromString(data);
auto max_processed_file_path = processed_node_metadata.file_path;
@ -403,13 +418,23 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name, -1));
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral));
requests.push_back(zkutil::makeCheckRequest(zookeeper_processed_path, processed_node_stat.version));
if (processed_node_exists)
{
requests.push_back(zkutil::makeCheckRequest(processed_node, processed_node_stat.version));
}
else
{
requests.push_back(zkutil::makeCreateRequest(processed_node, "", zkutil::CreateMode::Persistent));
requests.push_back(zkutil::makeRemoveRequest(processed_node, -1));
}
Coordination::Responses responses;
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
{
auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
auto holder = std::make_unique<ProcessingNodeHolder>(
node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
}
@ -500,11 +525,15 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = getZooKeeper();
auto processed_node = isShardedProcessing()
? zookeeper_processed_path / toString(getProcessingThreadForPath(path))
: zookeeper_processed_path;
while (true)
{
std::string res;
Coordination::Stat stat;
bool exists = zk_client->tryGet(zookeeper_processed_path, res, &stat);
bool exists = zk_client->tryGet(processed_node, res, &stat);
Coordination::Requests requests;
if (exists)
{
@ -527,11 +556,11 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
return;
}
}
requests.push_back(zkutil::makeSetRequest(zookeeper_processed_path, node_metadata, stat.version));
requests.push_back(zkutil::makeSetRequest(processed_node, node_metadata, stat.version));
}
else
{
requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path, node_metadata, zkutil::CreateMode::Persistent));
requests.push_back(zkutil::makeCreateRequest(processed_node, node_metadata, zkutil::CreateMode::Persistent));
}
Coordination::Responses responses;

View File

@ -80,6 +80,15 @@ public:
void deactivateCleanupTask();
bool isShardedProcessing() const { return getProcessingThreadsNum() > 1 && mode == S3QueueMode::ORDERED; }
size_t getProcessingThreadsNum() const { return shards_num * threads_per_shard; }
size_t getProcessingThreadForPath(const std::string & path) const;
/// shard_id must be in range [0, shards_num - 1]
size_t getIdForProcessingThread(size_t thread_id, size_t shard_id) const { return shard_id * threads_per_shard + thread_id; }
private:
const S3QueueMode mode;
const UInt64 max_set_size;
@ -87,6 +96,8 @@ private:
const UInt64 max_loading_retries;
const size_t min_cleanup_interval_ms;
const size_t max_cleanup_interval_ms;
const size_t shards_num;
const size_t threads_per_shard;
const fs::path zookeeper_processing_path;
const fs::path zookeeper_processed_path;
@ -117,8 +128,7 @@ private:
struct NodeMetadata
{
std::string file_path;
UInt64 last_processed_timestamp = 0;
std::string file_path; UInt64 last_processed_timestamp = 0;
std::string last_exception;
UInt64 retries = 0;
std::string processing_id; /// For ephemeral processing node.

View File

@ -29,6 +29,8 @@ class ASTStorage;
M(UInt32, s3queue_tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
M(UInt32, s3queue_total_shards_num, 1, "Value 0 means disabled", 0) \
M(UInt32, s3queue_current_shard_num, 0, "", 0) \
#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \
S3QUEUE_RELATED_SETTINGS(M, ALIAS) \

View File

@ -46,29 +46,86 @@ StorageS3QueueSource::FileIterator::FileIterator(
: metadata(metadata_)
, glob_iterator(std::move(glob_iterator_))
, shutdown_called(shutdown_called_)
, log(&Poco::Logger::get("StorageS3QueueSource"))
, sharded_processing(metadata->isShardedProcessing())
{
if (sharded_processing)
{
for (size_t i = 0; i < metadata->getProcessingThreadsNum(); ++i)
sharded_keys.emplace(i, std::deque<KeyWithInfoPtr>{});
}
}
StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next()
StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(size_t idx)
{
while (!shutdown_called)
{
KeyWithInfoPtr val = glob_iterator->next();
KeyWithInfoPtr val{nullptr};
if (sharded_processing)
{
LOG_TEST(log, "CHECK: {}", idx);
auto & keys = sharded_keys.at(idx);
if (!keys.empty())
{
std::lock_guard lk(sharded_keys_mutex);
val = keys.front();
keys.pop_front();
}
}
if (!val)
{
std::unique_lock lk(sharded_keys_mutex, std::defer_lock);
if (sharded_processing)
{
/// To make sure order on keys in each shard in sharded_keys.
lk.lock();
}
val = glob_iterator->next();
if (val && sharded_processing)
{
auto shard = metadata->getProcessingThreadForPath(val->key);
if (shard != idx)
{
LOG_TEST(log, "Key {} is for shard {} (total: {})", val->key, shard, sharded_keys.size());
auto & keys = sharded_keys.at(shard);
keys.push_back(val);
continue;
}
LOG_TEST(log, "Processing shard {} with key {}", shard, val->key);
}
}
if (!val)
return {};
if (shutdown_called)
{
LOG_TEST(&Poco::Logger::get("StorageS3QueueSource"), "Shutdown was called, stopping file iterator");
LOG_TEST(log, "Shutdown was called, stopping file iterator");
return {};
}
if (auto processing_holder = metadata->trySetFileAsProcessing(val->key);
processing_holder && !shutdown_called)
auto processing_holder = metadata->trySetFileAsProcessing(val->key);
if (shutdown_called)
{
LOG_TEST(log, "Shutdown was called, stopping file iterator");
return {};
}
if (processing_holder)
{
return std::make_shared<S3QueueKeyWithInfo>(val->key, val->info, processing_holder);
}
else if (sharded_processing
&& metadata->getFileStatus(val->key)->state == S3QueueFilesMetadata::FileStatus::State::Processing)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"File {} is processing by someone else in sharded processing. "
"It is a bug", val->key);
}
}
return {};
}
@ -83,6 +140,7 @@ StorageS3QueueSource::StorageS3QueueSource(
const Block & header_,
std::unique_ptr<StorageS3Source> internal_source_,
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
size_t processing_id_,
const S3QueueAction & action_,
RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_,
@ -96,6 +154,7 @@ StorageS3QueueSource::StorageS3QueueSource(
, WithContext(context_)
, name(std::move(name_))
, action(action_)
, processing_id(processing_id_)
, files_metadata(files_metadata_)
, internal_source(std::move(internal_source_))
, requested_virtual_columns(requested_virtual_columns_)
@ -123,7 +182,7 @@ void StorageS3QueueSource::lazyInitialize()
if (initialized)
return;
internal_source->lazyInitialize();
internal_source->lazyInitialize(processing_id);
reader = std::move(internal_source->reader);
if (reader)
reader_future = std::move(internal_source->reader_future);
@ -249,7 +308,7 @@ Chunk StorageS3QueueSource::generate()
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.
internal_source->create_reader_pool.wait();
reader_future = internal_source->createReaderAsync();
reader_future = internal_source->createReaderAsync(processing_id);
}
return {};

View File

@ -38,12 +38,15 @@ public:
class FileIterator : public IIterator
{
public:
FileIterator(std::shared_ptr<S3QueueFilesMetadata> metadata_, std::unique_ptr<GlobIterator> glob_iterator_, std::atomic<bool> & shutdown_called_);
FileIterator(
std::shared_ptr<S3QueueFilesMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_);
/// Note:
/// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
KeyWithInfoPtr next() override;
KeyWithInfoPtr next(size_t idx) override;
size_t estimatedKeysCount() override;
@ -52,6 +55,11 @@ public:
const std::unique_ptr<GlobIterator> glob_iterator;
std::atomic<bool> & shutdown_called;
std::mutex mutex;
Poco::Logger * log;
const bool sharded_processing;
std::unordered_map<size_t, std::deque<KeyWithInfoPtr>> sharded_keys;
std::mutex sharded_keys_mutex;
};
StorageS3QueueSource(
@ -59,6 +67,7 @@ public:
const Block & header_,
std::unique_ptr<StorageS3Source> internal_source_,
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
size_t processing_id_,
const S3QueueAction & action_,
RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_,
@ -80,6 +89,7 @@ public:
private:
const String name;
const S3QueueAction action;
const size_t processing_id;
const std::shared_ptr<S3QueueFilesMetadata> files_metadata;
const std::shared_ptr<StorageS3Source> internal_source;
const NamesAndTypesList requested_virtual_columns;

View File

@ -75,14 +75,8 @@ namespace
return zkutil::extractZooKeeperPath(result_zk_path, true);
}
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, Poco::Logger * log)
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings)
{
if (s3queue_settings.mode == S3QueueMode::ORDERED && s3queue_settings.s3queue_processing_threads_num > 1)
{
LOG_WARNING(log, "Parallel processing is not yet supported for Ordered mode");
s3queue_settings.s3queue_processing_threads_num = 1;
}
if (!s3queue_settings.s3queue_processing_threads_num)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
@ -99,6 +93,13 @@ namespace
"Setting `s3queue_cleanup_interval_min_ms` ({}) must be less or equal to `s3queue_cleanup_interval_max_ms` ({})",
s3queue_settings.s3queue_cleanup_interval_min_ms, s3queue_settings.s3queue_cleanup_interval_max_ms);
}
if (s3queue_settings.s3queue_current_shard_num >= s3queue_settings.s3queue_total_shards_num)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Setting `s3queue_current_shard_num` ({}) cannot exceed `{}` (`s3queue_total_shards_num` - 1)",
s3queue_settings.s3queue_current_shard_num, s3queue_settings.s3queue_total_shards_num);
///TODO: Add a test with different total_shards_settings for same keeper path - exception must be thrown.
}
}
@ -134,7 +135,7 @@ StorageS3Queue::StorageS3Queue(
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
}
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), log);
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef());
configuration.update(context_);
FormatFactory::instance().checkFormatName(configuration.format);
@ -221,13 +222,12 @@ public:
std::shared_ptr<StorageS3Queue> storage_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_)
size_t )
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, info(std::move(info_))
, storage(std::move(storage_))
, context(std::move(context_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
}
@ -236,7 +236,6 @@ private:
std::shared_ptr<StorageS3Queue> storage;
ContextPtr context;
size_t max_block_size;
size_t num_streams;
std::shared_ptr<StorageS3Queue::FileIterator> iterator;
@ -301,11 +300,15 @@ void StorageS3Queue::read(
void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
Pipes pipes;
const size_t adjusted_num_streams = std::min<size_t>(num_streams, storage->s3queue_settings->s3queue_processing_threads_num);
const size_t adjusted_num_streams = storage->s3queue_settings->s3queue_processing_threads_num;
createIterator(nullptr);
for (size_t i = 0; i < adjusted_num_streams; ++i)
pipes.emplace_back(storage->createSource(info, iterator, max_block_size, context));
pipes.emplace_back(storage->createSource(
info,
iterator,
storage->files_metadata->getIdForProcessingThread(i, storage->s3queue_settings->s3queue_current_shard_num),
max_block_size, context));
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
@ -320,6 +323,7 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const
std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
size_t processing_id,
size_t max_block_size,
ContextPtr local_context)
{
@ -359,7 +363,7 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
auto s3_queue_log = s3queue_settings->s3queue_enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr;
return std::make_shared<StorageS3QueueSource>(
getName(), info.source_header, std::move(internal_source),
files_metadata, after_processing, file_deleter, info.requested_virtual_columns,
files_metadata, processing_id, after_processing, file_deleter, info.requested_virtual_columns,
local_context, shutdown_called, table_is_being_dropped, s3_queue_log, getStorageID(), log);
}
@ -463,7 +467,8 @@ bool StorageS3Queue::streamToViews()
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
{
auto source = createSource(
read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
read_from_format_info, file_iterator, files_metadata->getIdForProcessingThread(i, s3queue_settings->s3queue_current_shard_num),
DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
pipes.emplace_back(std::move(source));
}

View File

@ -91,6 +91,7 @@ private:
std::shared_ptr<StorageS3QueueSource> createSource(
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
size_t processing_id,
size_t max_block_size,
ContextPtr local_context);

View File

@ -244,7 +244,7 @@ public:
fillInternalBufferAssumeLocked();
}
KeyWithInfoPtr next()
KeyWithInfoPtr next(size_t)
{
std::lock_guard lock(mutex);
return nextAssumeLocked();
@ -436,9 +436,9 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
{
}
StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next()
StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next(size_t idx) /// NOLINT
{
return pimpl->next();
return pimpl->next(idx);
}
size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount()
@ -471,7 +471,7 @@ public:
}
}
KeyWithInfoPtr next()
KeyWithInfoPtr next(size_t)
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= keys.size())
@ -516,9 +516,9 @@ StorageS3Source::KeysIterator::KeysIterator(
{
}
StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next()
StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next(size_t idx) /// NOLINT
{
return pimpl->next();
return pimpl->next(idx);
}
size_t StorageS3Source::KeysIterator::estimatedKeysCount()
@ -545,7 +545,7 @@ StorageS3Source::ReadTaskIterator::ReadTaskIterator(
buffer.emplace_back(std::make_shared<KeyWithInfo>(key_future.get(), std::nullopt));
}
StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next()
StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next(size_t) /// NOLINT
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= buffer.size())
@ -599,23 +599,23 @@ StorageS3Source::StorageS3Source(
{
}
void StorageS3Source::lazyInitialize()
void StorageS3Source::lazyInitialize(size_t idx)
{
if (initialized)
return;
reader = createReader();
reader = createReader(idx);
if (reader)
reader_future = createReaderAsync();
reader_future = createReaderAsync(idx);
initialized = true;
}
StorageS3Source::ReaderHolder StorageS3Source::createReader()
StorageS3Source::ReaderHolder StorageS3Source::createReader(size_t idx)
{
KeyWithInfoPtr key_with_info;
do
{
key_with_info = (*file_iterator)();
key_with_info = file_iterator->next(idx);
if (!key_with_info || key_with_info->key.empty())
return {};
@ -689,9 +689,9 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
return ReaderHolder{key_with_info, bucket, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)};
}
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync(size_t idx)
{
return create_reader_scheduler([this] { return createReader(); }, Priority{});
return create_reader_scheduler([=, this] { return createReader(idx); }, Priority{});
}
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key, size_t object_size)

View File

@ -61,7 +61,7 @@ public:
{
public:
virtual ~IIterator() = default;
virtual KeyWithInfoPtr next() = 0;
virtual KeyWithInfoPtr next(size_t idx = 0) = 0; /// NOLINT
/// Estimates how many streams we need to process all files.
/// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys.
@ -85,7 +85,7 @@ public:
const S3Settings::RequestSettings & request_settings_ = {},
std::function<void(FileProgress)> progress_callback_ = {});
KeyWithInfoPtr next() override;
KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT
size_t estimatedKeysCount() override;
private:
@ -106,7 +106,7 @@ public:
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback_ = {});
KeyWithInfoPtr next() override;
KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT
size_t estimatedKeysCount() override;
private:
@ -120,7 +120,7 @@ public:
public:
explicit ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count);
KeyWithInfoPtr next() override;
KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT
size_t estimatedKeysCount() override;
private:
@ -253,11 +253,11 @@ private:
/// Notice: we should initialize reader and future_reader lazily in generate to make sure key_condition
/// is set before createReader is invoked for key_condition is read in createReader.
void lazyInitialize();
void lazyInitialize(size_t idx = 0);
/// Recreate ReadBuffer and Pipeline for each file.
ReaderHolder createReader();
std::future<ReaderHolder> createReaderAsync();
ReaderHolder createReader(size_t idx = 0);
std::future<ReaderHolder> createReaderAsync(size_t idx = 0);
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key, size_t object_size);
std::unique_ptr<ReadBuffer> createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size);

View File

@ -960,3 +960,243 @@ def test_s3_client_reused(started_cluster):
s3_clients_after = get_created_s3_clients_count()
assert s3_clients_before == s3_clients_after
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
def test_processing_threads(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"processing_threads_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
processing_threads = 32
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
},
)
create_mv(node, table_name, dst_table_name)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
def get_count(table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(100):
if (get_count(f"{dst_table_name}")) == files_to_generate:
break
time.sleep(1)
assert get_count(dst_table_name) == files_to_generate
res = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}"
).splitlines()
]
assert {tuple(v) for v in res} == set([tuple(i) for i in total_values])
if mode == "ordered":
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
assert len(processed_nodes) == processing_threads
@pytest.mark.parametrize(
"mode, processing_threads",
[
pytest.param("unordered", 1),
pytest.param("unordered", 8),
pytest.param("ordered", 1),
pytest.param("ordered", 8),
],
)
def test_shards(started_cluster, mode, processing_threads):
node = started_cluster.instances["instance"]
table_name = f"test_shards_{mode}_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
shards_num = 3
for i in range(shards_num):
table = f"{table_name}_{i + 1}"
dst_table = f"{dst_table_name}_{i + 1}"
create_table(
started_cluster,
node,
table,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_total_shards_num": shards_num,
"s3queue_current_shard_num": i,
},
)
create_mv(node, table, dst_table)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
def get_count(table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(100):
if (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) == files_to_generate:
break
time.sleep(1)
if (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) != files_to_generate:
info = node.query(
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
)
logging.debug(info)
assert False
res1 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_1"
).splitlines()
]
res2 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_2"
).splitlines()
]
res3 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_3"
).splitlines()
]
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
)
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) == files_to_generate
if mode == "ordered":
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
assert len(processed_nodes) == shards_num * processing_threads
@pytest.mark.parametrize(
"mode, processing_threads",
[
pytest.param("unordered", 1),
pytest.param("unordered", 8),
pytest.param("ordered", 1),
pytest.param("ordered", 8),
],
)
def test_shards_distributed(started_cluster, mode, processing_threads):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"test_shards_distributed_{mode}_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
row_num = 50
total_rows = row_num * files_to_generate
shards_num = 2
i = 0
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_total_shards_num": shards_num,
"s3queue_current_shard_num": i,
},
)
i += 1
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=row_num
)
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(150):
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows:
break
time.sleep(1)
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) != total_rows:
info = node.query(
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
)
logging.debug(info)
assert False
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
res2 = [
list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines()
]
assert len(res1) + len(res2) == total_rows
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows
if mode == "ordered":
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
assert len(processed_nodes) == shards_num * processing_threads