fix race in StorageURL and StorageHDFS

This commit is contained in:
Anton Popov 2023-01-24 12:34:43 +00:00
parent 8a1ea62aec
commit 5c0307bc6a
3 changed files with 18 additions and 32 deletions

View File

@ -342,13 +342,6 @@ HDFSSource::HDFSSource(
initialize(); initialize();
} }
void HDFSSource::onCancel()
{
std::lock_guard lock(reader_mutex);
if (reader)
reader->cancel();
}
bool HDFSSource::initialize() bool HDFSSource::initialize()
{ {
current_path = (*file_iterator)(); current_path = (*file_iterator)();
@ -388,8 +381,12 @@ Chunk HDFSSource::generate()
{ {
while (true) while (true)
{ {
if (!reader || isCancelled()) if (isCancelled() || !reader)
{
if (reader)
reader->cancel();
break; break;
}
Chunk chunk; Chunk chunk;
if (reader->pull(chunk)) if (reader->pull(chunk))
@ -417,8 +414,6 @@ Chunk HDFSSource::generate()
return Chunk(std::move(columns), num_rows); return Chunk(std::move(columns), num_rows);
} }
{
std::lock_guard lock(reader_mutex);
reader.reset(); reader.reset();
pipeline.reset(); pipeline.reset();
read_buf.reset(); read_buf.reset();
@ -426,7 +421,6 @@ Chunk HDFSSource::generate()
if (!initialize()) if (!initialize())
break; break;
} }
}
return {}; return {};
} }

View File

@ -142,8 +142,6 @@ public:
Chunk generate() override; Chunk generate() override;
void onCancel() override;
private: private:
StorageHDFSPtr storage; StorageHDFSPtr storage;
Block block_for_format; Block block_for_format;
@ -155,8 +153,6 @@ private:
std::unique_ptr<ReadBuffer> read_buf; std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline; std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader; std::unique_ptr<PullingPipelineExecutor> reader;
/// onCancel and generate can be called concurrently.
std::mutex reader_mutex;
String current_path; String current_path;
/// Recreate ReadBuffer and PullingPipelineExecutor for each file. /// Recreate ReadBuffer and PullingPipelineExecutor for each file.

View File

@ -157,13 +157,6 @@ namespace
}; };
using URIInfoPtr = std::shared_ptr<URIInfo>; using URIInfoPtr = std::shared_ptr<URIInfo>;
void onCancel() override
{
std::lock_guard lock(reader_mutex);
if (reader)
reader->cancel();
}
static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri) static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri)
{ {
const auto & user_info = request_uri.getUserInfo(); const auto & user_info = request_uri.getUserInfo();
@ -241,6 +234,13 @@ namespace
{ {
while (true) while (true)
{ {
if (isCancelled())
{
if (reader)
reader->cancel();
break;
}
if (!reader) if (!reader)
{ {
auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1); auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1);
@ -249,18 +249,17 @@ namespace
auto current_uri = uri_info->uri_list_to_read[current_uri_pos]; auto current_uri = uri_info->uri_list_to_read[current_uri_pos];
std::lock_guard lock(reader_mutex);
initialize(current_uri); initialize(current_uri);
} }
Chunk chunk; Chunk chunk;
std::lock_guard lock(reader_mutex);
if (reader->pull(chunk)) if (reader->pull(chunk))
return chunk; return chunk;
pipeline->reset(); pipeline->reset();
reader.reset(); reader.reset();
} }
return {};
} }
static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer( static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer(
@ -443,9 +442,6 @@ namespace
std::unique_ptr<ReadBuffer> read_buf; std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline; std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader; std::unique_ptr<PullingPipelineExecutor> reader;
/// onCancell and generate can be called concurrently and both of them
/// have R/W access to reader pointer.
std::mutex reader_mutex;
Poco::Net::HTTPBasicCredentials credentials; Poco::Net::HTTPBasicCredentials credentials;
}; };