fix lost row group

This commit is contained in:
liuneng1994 2024-08-24 16:21:04 +08:00
parent d1de42469e
commit 909eee6ec6
5 changed files with 72 additions and 30 deletions

View File

@ -53,28 +53,13 @@ ParquetReader::ParquetReader(
if (row_groups_indices.empty())
for (int i = 0; i < meta_data->num_row_groups(); i++)
row_groups_indices.push_back(i);
chunk_reader = std::make_unique<SubRowGroupRangeReader>(row_groups_indices, [&](const size_t idx) { return getRowGroupChunkReader(idx); });
}
bool ParquetReader::loadRowGroupChunkReaderIfNeeded()
{
if (row_group_chunk_reader && !row_group_chunk_reader->hasMoreRows() && next_row_group_idx >= row_groups_indices.size())
return false;
if ((!row_group_chunk_reader || !row_group_chunk_reader->hasMoreRows()) && next_row_group_idx < row_groups_indices.size())
{
row_group_chunk_reader
= std::make_unique<RowGroupChunkReader>(this, meta_data->RowGroup(row_groups_indices[next_row_group_idx]), filters);
next_row_group_idx++;
}
return true;
}
Block ParquetReader::read()
{
Chunk chunk;
while (chunk.getNumRows() == 0)
{
if (!loadRowGroupChunkReaderIfNeeded()) break;
chunk = row_group_chunk_reader->readChunk(max_block_size);
}
Chunk chunk = chunk_reader->read(max_block_size);
if (!chunk) return header.cloneEmpty();
return header.cloneWithColumns(chunk.detachColumns());
}
@ -101,6 +86,36 @@ std::unique_ptr<RowGroupChunkReader> ParquetReader::getRowGroupChunkReader(size_
std::lock_guard lock(file_mutex);
return std::make_unique<RowGroupChunkReader>(this, meta_data->RowGroup(static_cast<int>(row_group_idx)), filters);
}
std::unique_ptr<SubRowGroupRangeReader> ParquetReader::getSubRowGroupRangeReader(std::vector<Int32> row_group_indices_)
{
return std::make_unique<SubRowGroupRangeReader>(row_group_indices_, [&](const size_t idx) { return getRowGroupChunkReader(idx); });
}
SubRowGroupRangeReader::SubRowGroupRangeReader(const std::vector<Int32> & rowGroupIndices, RowGroupReaderCreator && creator)
: row_group_indices(rowGroupIndices), row_group_reader_creator(creator)
{
}
DB::Chunk SubRowGroupRangeReader::read(size_t rows)
{
Chunk chunk;
while (chunk.getNumRows() == 0)
{
if (!loadRowGroupChunkReaderIfNeeded()) break;
chunk = row_group_chunk_reader->readChunk(rows);
}
return chunk;
}
bool SubRowGroupRangeReader::loadRowGroupChunkReaderIfNeeded()
{
if (row_group_chunk_reader && !row_group_chunk_reader->hasMoreRows() && next_row_group_idx >= row_group_indices.size())
return false;
if ((!row_group_chunk_reader || !row_group_chunk_reader->hasMoreRows()) && next_row_group_idx < row_group_indices.size())
{
row_group_chunk_reader = row_group_reader_creator(row_group_indices[next_row_group_idx]);
next_row_group_idx++;
}
return true;
}
}

View File

@ -12,6 +12,23 @@
namespace DB
{
class SubRowGroupRangeReader
{
public:
using RowGroupReaderCreator = std::function<std::unique_ptr<RowGroupChunkReader>(size_t)>;
explicit SubRowGroupRangeReader(const std::vector<Int32> & rowGroupIndices, RowGroupReaderCreator&& creator);
DB::Chunk read(size_t rows);
private:
bool loadRowGroupChunkReaderIfNeeded();
std::vector<Int32> row_group_indices;
std::unique_ptr<RowGroupChunkReader> row_group_chunk_reader;
size_t next_row_group_idx = 0;
RowGroupReaderCreator row_group_reader_creator;
};
class ParquetReader
{
public:
@ -30,9 +47,8 @@ public:
void addFilter(const String & column_name, ColumnFilterPtr filter);
void setRemainFilter(std::optional<ActionsDAG> & expr);
std::unique_ptr<RowGroupChunkReader> getRowGroupChunkReader(size_t row_group_idx);
std::unique_ptr<SubRowGroupRangeReader> getSubRowGroupRangeReader(std::vector<Int32> row_group_indices);
private:
bool loadRowGroupChunkReaderIfNeeded();
std::unique_ptr<parquet::ParquetFileReader> file_reader;
std::mutex file_mutex;
SeekableReadBuffer& file;
@ -40,7 +56,7 @@ private:
Block header;
std::unique_ptr<RowGroupChunkReader> row_group_chunk_reader;
std::unique_ptr<SubRowGroupRangeReader> chunk_reader;
UInt64 max_block_size;
parquet::ReaderProperties properties;

View File

@ -590,7 +590,7 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
ErrorCodes::BAD_ARGUMENTS,
"parquet native reader only supports little endian system currently");
#pragma clang diagnostic pop
row_group_batch.row_group_chunk_reader = new_native_reader->getRowGroupChunkReader(row_group_batch_idx);
row_group_batch.row_group_chunk_reader = new_native_reader->getSubRowGroupRangeReader(row_group_batch.row_groups_idxs);
// row_group_batch.native_record_reader = std::make_shared<ParquetRecordReader>(
// getPort().getHeader(),
// arrow_properties,
@ -683,8 +683,6 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un
lock.unlock();
auto end_of_row_group = [&] {
if (row_group_batch.row_group_chunk_reader)
row_group_batch.row_group_chunk_reader->printMetrics(std::cerr);
row_group_batch.row_group_chunk_reader.reset();
row_group_batch.native_record_reader.reset();
row_group_batch.arrow_column_to_ch_column.reset();
@ -714,7 +712,7 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un
if (format_settings.parquet.use_native_reader)
{
auto chunk = row_group_batch.row_group_chunk_reader->readChunk(row_group_batch.adaptive_chunk_size);
auto chunk = row_group_batch.row_group_chunk_reader->read(row_group_batch.adaptive_chunk_size);
if (!chunk)
{
end_of_row_group();

View File

@ -20,7 +20,7 @@ namespace DB
class ArrowColumnToCHColumn;
class ParquetRecordReader;
class ParquetReader;
class RowGroupChunkReader;
class SubRowGroupRangeReader;
// Parquet files contain a metadata block with the following information:
// * list of columns,
@ -225,7 +225,7 @@ private:
std::shared_ptr<ParquetRecordReader> native_record_reader;
std::unique_ptr<parquet::arrow::FileReader> file_reader;
std::shared_ptr<arrow::RecordBatchReader> record_batch_reader;
std::unique_ptr<RowGroupChunkReader> row_group_chunk_reader;
std::unique_ptr<SubRowGroupRangeReader> row_group_chunk_reader;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
};

View File

@ -464,10 +464,23 @@ TEST(Processors, BenchmarkReadNullableString)
template<class T>
static void testGatherDictInt()
{
PaddedPODArray<T> data = {0, 0, 0, 3, 3, 3, 4, 7, 0, 4, 7, 0, 9, 1, 5, 6, 7, 8, 9, 3, 4, 6, 7};
PaddedPODArray<T> dict = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
int size = 10000;
PaddedPODArray<T> data;
PaddedPODArray<T> dict;
std::unordered_map<T, Int32> map;
PaddedPODArray<T> dist;
PaddedPODArray<Int32> idx = {0, 0, 0, 3, 3, 3, 4, 7, 0, 4, 7, 0, 9, 1, 5, 6, 7, 8, 9, 3, 4, 6, 7};
PaddedPODArray<Int32> idx;
for (size_t i = 0; i < size; ++i)
{
auto value = std::rand() % 10000;
data.push_back(value);
if (map.find(value) == map.end())
{
map[value] = static_cast<Int32>(dict.size());
dict.push_back(value);
}
idx.push_back(map[value]);
}
dist.reserve(data.size());
FilterHelper::gatherDictFixedValue(dict, dist, idx, data.size());
ASSERT_EQ(data.size(), dist.size());