Merge pull request #34539 from ClickHouse/fix-query-cancelation

Fix cancelation for S3 and HDFS
This commit is contained in:
Maksim Kita 2022-02-12 00:36:03 +01:00 committed by GitHub
commit d680a017e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 57 deletions

View File

@ -372,44 +372,47 @@ String HDFSSource::getName() const
Chunk HDFSSource::generate()
{
if (!reader)
return {};
Chunk chunk;
if (reader->pull(chunk))
while (true)
{
Columns columns = chunk.getColumns();
UInt64 num_rows = chunk.getNumRows();
if (!reader || isCancelled())
break;
/// Enrich with virtual columns.
if (need_path_column)
Chunk chunk;
if (reader->pull(chunk))
{
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
columns.push_back(column->convertToFullColumnIfConst());
Columns columns = chunk.getColumns();
UInt64 num_rows = chunk.getNumRows();
/// Enrich with virtual columns.
if (need_path_column)
{
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
columns.push_back(column->convertToFullColumnIfConst());
}
if (need_file_column)
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
columns.push_back(column->convertToFullColumnIfConst());
}
return Chunk(std::move(columns), num_rows);
}
if (need_file_column)
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
std::lock_guard lock(reader_mutex);
reader.reset();
pipeline.reset();
read_buf.reset();
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
columns.push_back(column->convertToFullColumnIfConst());
if (!initialize())
break;
}
return Chunk(std::move(columns), num_rows);
}
{
std::lock_guard lock(reader_mutex);
reader.reset();
pipeline.reset();
read_buf.reset();
if (!initialize())
return {};
}
return generate();
return {};
}

View File

@ -302,40 +302,42 @@ String StorageS3Source::getName() const
Chunk StorageS3Source::generate()
{
if (!reader)
return {};
Chunk chunk;
if (reader->pull(chunk))
while (true)
{
UInt64 num_rows = chunk.getNumRows();
if (!reader || isCancelled())
break;
if (with_path_column)
chunk.addColumn(DataTypeLowCardinality{std::make_shared<DataTypeString>()}
.createColumnConst(num_rows, file_path)
->convertToFullColumnIfConst());
if (with_file_column)
Chunk chunk;
if (reader->pull(chunk))
{
size_t last_slash_pos = file_path.find_last_of('/');
chunk.addColumn(DataTypeLowCardinality{std::make_shared<DataTypeString>()}
.createColumnConst(num_rows, file_path.substr(last_slash_pos + 1))
->convertToFullColumnIfConst());
UInt64 num_rows = chunk.getNumRows();
if (with_path_column)
chunk.addColumn(DataTypeLowCardinality{std::make_shared<DataTypeString>()}
.createColumnConst(num_rows, file_path)
->convertToFullColumnIfConst());
if (with_file_column)
{
size_t last_slash_pos = file_path.find_last_of('/');
chunk.addColumn(DataTypeLowCardinality{std::make_shared<DataTypeString>()}
.createColumnConst(num_rows, file_path.substr(last_slash_pos + 1))
->convertToFullColumnIfConst());
}
return chunk;
}
return chunk;
{
std::lock_guard lock(reader_mutex);
reader.reset();
pipeline.reset();
read_buf.reset();
if (!initialize())
break;
}
}
{
std::lock_guard lock(reader_mutex);
reader.reset();
pipeline.reset();
read_buf.reset();
if (!initialize())
return {};
}
return generate();
return {};
}
static bool checkIfObjectExists(const std::shared_ptr<Aws::S3::S3Client> & client, const String & bucket, const String & key)