mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
Allow to define a starting point for s3queue ordered mode
This commit is contained in:
parent
0d864ebfc0
commit
361b2f107b
@ -637,25 +637,31 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder
|
|||||||
"this could be a result of expired zookeeper session", path);
|
"this could be a result of expired zookeeper session", path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder)
|
void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder)
|
||||||
|
{
|
||||||
|
auto processed_node_path = isShardedProcessing()
|
||||||
|
? zookeeper_processed_path / toString(getProcessingIdForPath(holder->path))
|
||||||
|
: zookeeper_processed_path;
|
||||||
|
|
||||||
|
return setFileProcessedForOrderedModeImpl(holder->path, holder, processed_node_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
|
||||||
|
const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path)
|
||||||
{
|
{
|
||||||
/// Update a persistent node in /processed and remove ephemeral node from /processing.
|
/// Update a persistent node in /processed and remove ephemeral node from /processing.
|
||||||
|
|
||||||
const auto & path = holder->path;
|
|
||||||
const auto node_name = getNodeName(path);
|
const auto node_name = getNodeName(path);
|
||||||
const auto node_metadata = createNodeMetadata(path).toString();
|
const auto node_metadata = createNodeMetadata(path).toString();
|
||||||
const auto zk_client = getZooKeeper();
|
const auto zk_client = getZooKeeper();
|
||||||
|
|
||||||
auto processed_node = isShardedProcessing()
|
LOG_TEST(log, "Setting file `{}` as processed (at {})", path, processed_node_path);
|
||||||
? zookeeper_processed_path / toString(getProcessingIdForPath(path))
|
|
||||||
: zookeeper_processed_path;
|
|
||||||
|
|
||||||
LOG_TEST(log, "Setting file `{}` as processed", path);
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
std::string res;
|
std::string res;
|
||||||
Coordination::Stat stat;
|
Coordination::Stat stat;
|
||||||
bool exists = zk_client->tryGet(processed_node, res, &stat);
|
bool exists = zk_client->tryGet(processed_node_path, res, &stat);
|
||||||
Coordination::Requests requests;
|
Coordination::Requests requests;
|
||||||
if (exists)
|
if (exists)
|
||||||
{
|
{
|
||||||
@ -664,39 +670,41 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
|
|||||||
auto metadata = NodeMetadata::fromString(res);
|
auto metadata = NodeMetadata::fromString(res);
|
||||||
if (metadata.file_path >= path)
|
if (metadata.file_path >= path)
|
||||||
{
|
{
|
||||||
/// Here we get in the case that maximum processed file is bigger than ours.
|
LOG_TRACE(log, "File {} is already processed, current max processed file: {}", path, metadata.file_path);
|
||||||
/// This is possible to achieve in case of parallel processing
|
|
||||||
/// but for local processing we explicitly disable parallel mode and do everything in a single thread
|
|
||||||
/// (see constructor of StorageS3Queue where s3queue_processing_threads_num is explicitly set to 1 in case of Ordered mode).
|
|
||||||
/// Nevertheless, in case of distributed processing we cannot do anything with parallelism.
|
|
||||||
/// What this means?
|
|
||||||
/// It means that in scenario "distributed processing + Ordered mode"
|
|
||||||
/// a setting s3queue_loading_retries will not work. It is possible to fix, it is in TODO.
|
|
||||||
|
|
||||||
/// Return because there is nothing to change,
|
|
||||||
/// the max processed file is already bigger than ours.
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
requests.push_back(zkutil::makeSetRequest(processed_node, node_metadata, stat.version));
|
requests.push_back(zkutil::makeSetRequest(processed_node_path, node_metadata, stat.version));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
requests.push_back(zkutil::makeCreateRequest(processed_node, node_metadata, zkutil::CreateMode::Persistent));
|
requests.push_back(zkutil::makeCreateRequest(processed_node_path, node_metadata, zkutil::CreateMode::Persistent));
|
||||||
}
|
}
|
||||||
|
|
||||||
Coordination::Responses responses;
|
Coordination::Responses responses;
|
||||||
if (holder->remove(&requests, &responses))
|
if (holder)
|
||||||
{
|
{
|
||||||
LOG_TEST(log, "Moved file `{}` to processed", path);
|
if (holder->remove(&requests, &responses))
|
||||||
if (max_loading_retries)
|
{
|
||||||
zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1);
|
LOG_TEST(log, "Moved file `{}` to processed", path);
|
||||||
return;
|
if (max_loading_retries)
|
||||||
|
zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto code = zk_client->tryMulti(requests, responses);
|
||||||
|
if (code == Coordination::Error::ZOK)
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Failed to update max processed node, retry.
|
/// Failed to update max processed node, retry.
|
||||||
if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK)
|
if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK)
|
||||||
|
{
|
||||||
|
LOG_TRACE(log, "Failed to update processed node ({}). Will retry.", magic_enum::enum_name(responses[0]->error));
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
LOG_WARNING(log, "Cannot set file ({}) as processed since processing node "
|
LOG_WARNING(log, "Cannot set file ({}) as processed since processing node "
|
||||||
"does not exist with expected processing id does not exist, "
|
"does not exist with expected processing id does not exist, "
|
||||||
@ -705,6 +713,22 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void S3QueueFilesMetadata::setFileProcessed(const std::string & path, size_t shard_id)
|
||||||
|
{
|
||||||
|
if (mode != S3QueueMode::ORDERED)
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can set file as preprocessed only for Ordered mode");
|
||||||
|
|
||||||
|
if (isShardedProcessing())
|
||||||
|
{
|
||||||
|
for (const auto & processor : getProcessingIdsForShard(shard_id))
|
||||||
|
setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path / toString(processor));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const String & exception_message)
|
void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const String & exception_message)
|
||||||
{
|
{
|
||||||
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds);
|
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds);
|
||||||
|
@ -42,6 +42,7 @@ public:
|
|||||||
~S3QueueFilesMetadata();
|
~S3QueueFilesMetadata();
|
||||||
|
|
||||||
void setFileProcessed(ProcessingNodeHolderPtr holder);
|
void setFileProcessed(ProcessingNodeHolderPtr holder);
|
||||||
|
void setFileProcessed(const std::string & path, size_t shard_id);
|
||||||
|
|
||||||
void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message);
|
void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message);
|
||||||
|
|
||||||
@ -141,6 +142,9 @@ private:
|
|||||||
void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder);
|
void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder);
|
||||||
std::string getZooKeeperPathForShard(size_t shard_id) const;
|
std::string getZooKeeperPathForShard(size_t shard_id) const;
|
||||||
|
|
||||||
|
void setFileProcessedForOrderedModeImpl(
|
||||||
|
const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path);
|
||||||
|
|
||||||
enum class SetFileProcessingResult
|
enum class SetFileProcessingResult
|
||||||
{
|
{
|
||||||
Success,
|
Success,
|
||||||
|
@ -22,6 +22,7 @@ class ASTStorage;
|
|||||||
M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
|
M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
|
||||||
M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \
|
M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \
|
||||||
M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
|
M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
|
||||||
|
M(String, s3queue_last_processed_path, "", "For Ordered mode. Files with smaller file name are considered already processed", 0) \
|
||||||
M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
|
M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
|
||||||
M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
|
M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
|
||||||
M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
|
M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
|
||||||
|
@ -155,10 +155,6 @@ StorageS3Queue::StorageS3Queue(
|
|||||||
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());
|
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());
|
||||||
task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
|
task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
|
||||||
|
|
||||||
/// Get metadata manager from S3QueueMetadataFactory,
|
|
||||||
/// it will increase the ref count for the metadata object.
|
|
||||||
/// The ref count is decreased when StorageS3Queue::drop() method is called.
|
|
||||||
files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings);
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
createOrCheckMetadata(storage_metadata);
|
createOrCheckMetadata(storage_metadata);
|
||||||
@ -169,6 +165,11 @@ StorageS3Queue::StorageS3Queue(
|
|||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get metadata manager from S3QueueMetadataFactory,
|
||||||
|
/// it will increase the ref count for the metadata object.
|
||||||
|
/// The ref count is decreased when StorageS3Queue::drop() method is called.
|
||||||
|
files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings);
|
||||||
|
|
||||||
if (files_metadata->isShardedProcessing())
|
if (files_metadata->isShardedProcessing())
|
||||||
{
|
{
|
||||||
if (!s3queue_settings->s3queue_current_shard_num.changed)
|
if (!s3queue_settings->s3queue_current_shard_num.changed)
|
||||||
@ -181,6 +182,10 @@ StorageS3Queue::StorageS3Queue(
|
|||||||
files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num);
|
files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (s3queue_settings->mode == S3QueueMode::ORDERED && !s3queue_settings->s3queue_last_processed_path.value.empty())
|
||||||
|
{
|
||||||
|
files_metadata->setFileProcessed(s3queue_settings->s3queue_last_processed_path.value, s3queue_settings->s3queue_current_shard_num);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageS3Queue::startup()
|
void StorageS3Queue::startup()
|
||||||
|
@ -1283,3 +1283,53 @@ def test_settings_check(started_cluster):
|
|||||||
)
|
)
|
||||||
|
|
||||||
node.query(f"DROP TABLE {table_name} SYNC")
|
node.query(f"DROP TABLE {table_name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("processing_threads", [1, 5])
|
||||||
|
def test_processed_file_setting(started_cluster, processing_threads):
|
||||||
|
node = started_cluster.instances["instance"]
|
||||||
|
table_name = f"test_processed_file_setting_{processing_threads}"
|
||||||
|
dst_table_name = f"{table_name}_dst"
|
||||||
|
keeper_path = f"/clickhouse/test_{table_name}"
|
||||||
|
files_path = f"{table_name}_data"
|
||||||
|
files_to_generate = 10
|
||||||
|
|
||||||
|
create_table(
|
||||||
|
started_cluster,
|
||||||
|
node,
|
||||||
|
table_name,
|
||||||
|
"ordered",
|
||||||
|
files_path,
|
||||||
|
additional_settings={
|
||||||
|
"keeper_path": keeper_path,
|
||||||
|
"s3queue_processing_threads_num": processing_threads,
|
||||||
|
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
total_values = generate_random_files(
|
||||||
|
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
||||||
|
)
|
||||||
|
|
||||||
|
create_mv(node, table_name, dst_table_name)
|
||||||
|
|
||||||
|
def get_count():
|
||||||
|
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
|
||||||
|
|
||||||
|
expected_rows = 4
|
||||||
|
for _ in range(20):
|
||||||
|
if expected_rows == get_count():
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
assert expected_rows == get_count()
|
||||||
|
|
||||||
|
node.restart_clickhouse()
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
expected_rows = 4
|
||||||
|
for _ in range(20):
|
||||||
|
if expected_rows == get_count():
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
assert expected_rows == get_count()
|
||||||
|
Loading…
Reference in New Issue
Block a user