work with comments on review

This commit is contained in:
Sema Checherinda 2023-02-21 22:55:58 +01:00
parent 3b58249320
commit ef0c1841af

View File

@ -78,31 +78,34 @@ public:
String getName() const override { return "DataPartsSource"; }
protected:
Chunk nullWhenNoRows(MutableColumns && new_columns)
{
chassert(!new_columns.empty());
const auto rows = new_columns[0]->size();
if (!rows)
return {};
return {std::move(new_columns), rows};
}
Chunk generate() override
{
Chunk result;
MutableColumns new_columns = getPort().getHeader().cloneEmptyColumns();
chassert(!new_columns.empty());
while (result.getNumRows() < block_size)
while (new_columns[0]->size() < block_size)
{
if (detached_parts.empty())
getMoreParts();
if (detached_parts.empty())
{
progress(result.getNumRows(), result.bytes());
return result;
}
return nullWhenNoRows(std::move(new_columns));
Chunk chunk = generateChunk(block_size - result.getNumRows());
if (result)
result.append(chunk);
else
result = std::move(chunk);
generateRows(new_columns, block_size - new_columns[0]->size());
}
progress(result.getNumRows(), result.bytes());
return result;
return nullWhenNoRows(std::move(new_columns));
}
private:
@ -110,7 +113,7 @@ private:
const std::vector<UInt8> columns_mask;
const UInt64 block_size;
const bool has_bytes_on_disk_column;
static const size_t files_peer_thread = 15;
static const size_t files_peer_thread = 30;
StoragesInfo current_info;
DetachedPartsInfo detached_parts;
@ -173,7 +176,7 @@ private:
future.get();
}
Chunk generateChunk(size_t max_rows)
void generateRows(MutableColumns & new_columns, size_t max_rows)
{
chassert(current_info);
@ -183,8 +186,6 @@ private:
std::vector<std::atomic<size_t>> parts_sizes(rows);
calculatePartSizeOnDisk(begin, parts_sizes);
MutableColumns new_columns = getPort().getHeader().cloneEmptyColumns();
for (auto p_id = begin; p_id < detached_parts.size(); ++p_id)
{
auto & p = detached_parts.at(p_id);
@ -221,8 +222,6 @@ private:
}
detached_parts.resize(begin);
return {std::move(new_columns), rows};
}
};