This commit is contained in:
kssenii 2023-09-27 18:44:53 +02:00
parent f753b91a3b
commit e0ff76a7f9
4 changed files with 26 additions and 15 deletions

View File

@ -430,11 +430,12 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder
return;
}
/// TODO this could be because of the expired session.
if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to set file as processed but it is not processing");
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to set file as processed but it is already processed");
LOG_WARNING(log, "Cannot set file ({}) as processed since processing node "
"does not exist with expected processing id does not exist, "
"this could be a result of expired zookeeper session", path);
}
void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder)
@ -472,8 +473,9 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK)
continue;
LOG_ERROR(log, "Cannot set file {} as failed - failed to remove ephemeral processing node", path);
chassert(false);
LOG_WARNING(log, "Cannot set file ({}) as processed since processing node "
"does not exist with expected processing id does not exist, "
"this could be a result of expired zookeeper session", path);
return;
}
}
@ -531,7 +533,6 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
{
/// File is no longer retriable.
/// Make a failed/node_name node and remove failed/node_name.retriable node.
/// TODO: always add version for processing node.
Coordination::Requests requests;
requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1));
@ -595,9 +596,9 @@ S3QueueFilesMetadata::ProcessingNodeHolder::~ProcessingNodeHolder()
bool S3QueueFilesMetadata::ProcessingNodeHolder::remove(Coordination::Requests * requests, Coordination::Responses * responses)
{
if (removed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Processing file holder is already released");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Processing node is already removed");
removed = true;
LOG_TEST(log, "Removing processing node {} ({})", zk_node_path, path);
try
{
@ -617,11 +618,14 @@ bool S3QueueFilesMetadata::ProcessingNodeHolder::remove(Coordination::Requests *
{
requests->push_back(zkutil::makeRemoveRequest(zk_node_path, stat.version));
auto code = zk_client->tryMulti(*requests, *responses);
return code == Coordination::Error::ZOK;
removed = code == Coordination::Error::ZOK;
}
else
{
zk_client->remove(zk_node_path);
return true;
removed = true;
}
return removed;
}
else
LOG_WARNING(log, "Cannot remove {} since precessing id changed: {} -> {}",

View File

@ -139,7 +139,6 @@ Chunk StorageS3QueueSource::generate()
LOG_TEST(log, "Read {} rows from file: {}", chunk.getNumRows(), reader.getPath());
file_status->processed_rows += chunk.getNumRows();
// file_status->profile_counters.increment(ProfileEvents::S3QueuePullMicroseconds, timer.get());
processed_rows_from_file += chunk.getNumRows();
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath());

View File

@ -101,6 +101,12 @@ StorageS3Queue::StorageS3Queue(
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
}
if (s3queue_settings->mode == S3QueueMode::ORDERED && s3queue_settings->s3queue_processing_threads_num > 1)
{
LOG_WARNING(log, "Parallel processing is not yet supported for Ordered mode");
s3queue_settings->s3queue_processing_threads_num = 1;
}
configuration.update(context_);
FormatFactory::instance().checkFormatName(configuration.format);
context_->getRemoteHostFilter().checkURL(configuration.url.uri);
@ -181,7 +187,8 @@ Pipe StorageS3Queue::read(
}
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
const size_t adjusted_num_streams = std::min<size_t>(num_streams, s3queue_settings->s3queue_processing_threads_num);
for (size_t i = 0; i < adjusted_num_streams; ++i)
pipes.emplace_back(createSource(column_names, storage_snapshot, query_info.query, max_block_size, local_context));
return Pipe::unitePipes(std::move(pipes));
}

View File

@ -558,7 +558,7 @@ def test_multiple_tables_meta_mismatch(started_cluster):
},
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
assert "Metadata with the same `s3queue_zookeeper_path` was already created but with different settings" in str(e)
failed = True
assert failed is True
@ -836,15 +836,16 @@ def test_max_set_size(started_cluster):
"s3queue_tracked_files_limit": 9,
"s3queue_cleanup_interval_min_ms": 0,
"s3queue_cleanup_interval_max_ms": 0,
"s3queue_processing_threads_num": 1,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
get_query = f"SELECT * FROM {table_name}"
get_query = f"SELECT * FROM {table_name} ORDER BY column1, column2, column3"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == total_values
assert res1 == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
print(total_values)
time.sleep(10)