Merge pull request #45529 from CurtizJ/fix-storage-s3-race

Try to fix test `test_storage_s3/test.py::test_wrong_s3_syntax` (race in `StorageS3`)
This commit is contained in:
Anton Popov 2023-01-26 14:21:32 +01:00 committed by GitHub
commit b58b73b0e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 32 additions and 57 deletions

View File

@ -341,13 +341,6 @@ HDFSSource::HDFSSource(
initialize();
}
void HDFSSource::onCancel()
{
std::lock_guard lock(reader_mutex);
if (reader)
reader->cancel();
}
bool HDFSSource::initialize()
{
current_path = (*file_iterator)();
@ -387,8 +380,12 @@ Chunk HDFSSource::generate()
{
while (true)
{
if (!reader || isCancelled())
if (isCancelled() || !reader)
{
if (reader)
reader->cancel();
break;
}
Chunk chunk;
if (reader->pull(chunk))
@ -416,15 +413,12 @@ Chunk HDFSSource::generate()
return Chunk(std::move(columns), num_rows);
}
{
std::lock_guard lock(reader_mutex);
reader.reset();
pipeline.reset();
read_buf.reset();
reader.reset();
pipeline.reset();
read_buf.reset();
if (!initialize())
break;
}
if (!initialize())
break;
}
return {};
}

View File

@ -142,8 +142,6 @@ public:
Chunk generate() override;
void onCancel() override;
private:
StorageHDFSPtr storage;
Block block_for_format;
@ -155,8 +153,6 @@ private:
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
/// onCancel and generate can be called concurrently.
std::mutex reader_mutex;
String current_path;
/// Recreate ReadBuffer and PullingPipelineExecutor for each file.

View File

@ -579,15 +579,6 @@ StorageS3Source::StorageS3Source(
reader_future = createReaderAsync();
}
void StorageS3Source::onCancel()
{
std::lock_guard lock(reader_mutex);
if (reader)
reader->cancel();
}
StorageS3Source::ReaderHolder StorageS3Source::createReader()
{
auto [current_key, info] = (*file_iterator)();
@ -708,8 +699,12 @@ Chunk StorageS3Source::generate()
{
while (true)
{
if (!reader || isCancelled())
if (isCancelled() || !reader)
{
if (reader)
reader->cancel();
break;
}
Chunk chunk;
if (reader->pull(chunk))
@ -741,21 +736,19 @@ Chunk StorageS3Source::generate()
return chunk;
}
{
std::lock_guard lock(reader_mutex);
assert(reader_future.valid());
reader = reader_future.get();
assert(reader_future.valid());
reader = reader_future.get();
if (!reader)
break;
if (!reader)
break;
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.
create_reader_pool.wait();
reader_future = createReaderAsync();
}
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.
create_reader_pool.wait();
reader_future = createReaderAsync();
}
return {};
}

View File

@ -145,8 +145,6 @@ public:
Chunk generate() override;
void onCancel() override;
private:
String name;
String bucket;
@ -209,8 +207,6 @@ private:
ReaderHolder reader;
/// onCancel and generate can be called concurrently
std::mutex reader_mutex;
std::vector<NameAndTypePair> requested_virtual_columns;
std::shared_ptr<IIterator> file_iterator;
size_t download_thread_num = 1;

View File

@ -157,13 +157,6 @@ namespace
};
using URIInfoPtr = std::shared_ptr<URIInfo>;
void onCancel() override
{
std::lock_guard lock(reader_mutex);
if (reader)
reader->cancel();
}
static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri)
{
const auto & user_info = request_uri.getUserInfo();
@ -241,6 +234,13 @@ namespace
{
while (true)
{
if (isCancelled())
{
if (reader)
reader->cancel();
break;
}
if (!reader)
{
auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1);
@ -249,18 +249,17 @@ namespace
auto current_uri = uri_info->uri_list_to_read[current_uri_pos];
std::lock_guard lock(reader_mutex);
initialize(current_uri);
}
Chunk chunk;
std::lock_guard lock(reader_mutex);
if (reader->pull(chunk))
return chunk;
pipeline->reset();
reader.reset();
}
return {};
}
static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer(
@ -443,9 +442,6 @@ namespace
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
/// onCancell and generate can be called concurrently and both of them
/// have R/W access to reader pointer.
std::mutex reader_mutex;
Poco::Net::HTTPBasicCredentials credentials;
};