This commit is contained in:
xiedeyantu 2022-12-23 19:09:58 +08:00
parent 5c8fb627b3
commit b5fd23358f
2 changed files with 53 additions and 22 deletions

View File

@ -552,7 +552,9 @@ StorageS3Source::StorageS3Source(
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3Reader"))
{
reader = createReader();
if (reader)
if (reader ||
(reader.getReadMode() == StorageS3Source::ReaderHolder::ONLY_VIRTUAL_COLUMNS &&
!reader.isFinishedForOnlyVirtualColumns()))
reader_future = createReaderAsync();
}
@ -572,7 +574,12 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
return {};
if (only_need_virtual_columns)
return ReaderHolder{fs::path(bucket) / current_key, nullptr, nullptr, nullptr};
return ReaderHolder{
fs::path(bucket) / current_key,
nullptr,
nullptr,
nullptr,
StorageS3Source::ReaderHolder::ONLY_VIRTUAL_COLUMNS};
size_t object_size = info
? info->size
@ -707,33 +714,40 @@ Chunk StorageS3Source::generate()
}
};
if (!reader.getPath().empty() && only_need_virtual_columns)
{
Chunk chunk;
add_virtual_column(chunk, 1);
only_need_virtual_columns = false;
return chunk;
}
while (true)
{
if (!reader || isCancelled())
if (isCancelled())
break;
Chunk chunk;
if (reader->pull(chunk))
if (reader.getReadMode() == ReaderHolder::ONLY_VIRTUAL_COLUMNS)
{
UInt64 num_rows = chunk.getNumRows();
size_t total_size = file_iterator->getTotalSize();
if (num_rows && total_size)
if (!reader.isFinishedForOnlyVirtualColumns())
{
updateRowsProgressApprox(
*this, chunk, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max);
add_virtual_column(chunk, 1);
reader.setFinishedForOnlyVirtualColumns();
return chunk;
}
}
else
{
if (!reader)
break;
add_virtual_column(chunk, num_rows);
return chunk;
if (reader->pull(chunk))
{
UInt64 num_rows = chunk.getNumRows();
size_t total_size = file_iterator->getTotalSize();
if (num_rows && total_size)
{
updateRowsProgressApprox(
*this, chunk, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max);
}
add_virtual_column(chunk, num_rows);
return chunk;
}
}
{
@ -742,7 +756,10 @@ Chunk StorageS3Source::generate()
assert(reader_future.valid());
reader = reader_future.get();
if (!reader)
if ((!reader &&
reader.getReadMode() == ReaderHolder::ALL) ||
(reader.getReadMode() == ReaderHolder::ONLY_VIRTUAL_COLUMNS &&
reader.isFinishedForOnlyVirtualColumns()))
break;
/// Even if task is finished the thread may be not freed in pool.

View File

@ -165,15 +165,24 @@ private:
struct ReaderHolder
{
public:
enum ReadMode
{
ONLY_VIRTUAL_COLUMNS,
ALL
};
ReaderHolder(
String path_,
std::unique_ptr<ReadBuffer> read_buf_,
std::unique_ptr<QueryPipeline> pipeline_,
std::unique_ptr<PullingPipelineExecutor> reader_)
std::unique_ptr<PullingPipelineExecutor> reader_,
ReadMode read_mode_ = ALL)
: path(std::move(path_))
, read_buf(std::move(read_buf_))
, pipeline(std::move(pipeline_))
, reader(std::move(reader_))
, is_finished_for_only_virtual_columns(path.empty() ? true : false)
, read_mode(read_mode_)
{
}
@ -183,12 +192,17 @@ private:
PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); }
const String & getPath() const { return path; }
ReadMode getReadMode() const { return read_mode; }
bool isFinishedForOnlyVirtualColumns() const { return is_finished_for_only_virtual_columns; }
void setFinishedForOnlyVirtualColumns() { is_finished_for_only_virtual_columns = true; }
private:
String path;
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
bool is_finished_for_only_virtual_columns{false};
ReadMode read_mode = ALL;
};
ReaderHolder reader;