Merge pull request #8793 from ClickHouse/fix-null-format-for-processors

Update ConvertingAggregatedToChunksTransform.
This commit is contained in:
alexey-milovidov 2020-01-26 21:51:00 +03:00 committed by GitHub
commit 2787960ba2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -6,6 +6,7 @@
#include <Processors/ISource.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
namespace ProfileEvents
{
extern const Event ExternalAggregationMerge;
@ -14,23 +15,36 @@ namespace ProfileEvents
namespace DB
{
/// Convert block to chunk.
/// Adds additional info about aggregation.
static Chunk convertToChunk(const Block & block)
{
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
}
namespace
{
/// Convert block to chunk.
/// Adds additional info about aggregation.
Chunk convertToChunk(const Block & block)
{
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
}
const AggregatedChunkInfo * getInfoFromChunk(const Chunk & chunk)
{
auto & info = chunk.getChunkInfo();
if (!info)
throw Exception("Chunk info was not set for chunk.", ErrorCodes::LOGICAL_ERROR);
auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
if (!agg_info)
throw Exception("Chunk should have AggregatedChunkInfo.", ErrorCodes::LOGICAL_ERROR);
return agg_info;
}
/// Reads chunks from file in native format. Provide chunks with aggregation info.
class SourceFromNativeStream : public ISource
{
@ -77,13 +91,13 @@ public:
struct SharedData
{
std::atomic<UInt32> next_bucket_to_merge = 0;
std::array<std::atomic<Int32>, NUM_BUCKETS> source_for_bucket;
std::array<std::atomic<bool>, NUM_BUCKETS> is_bucket_processed;
std::atomic<bool> is_cancelled = false;
SharedData()
{
for (auto & source : source_for_bucket)
source = -1;
for (auto & flag : is_bucket_processed)
flag = false;
}
};
@ -93,13 +107,11 @@ public:
AggregatingTransformParamsPtr params_,
ManyAggregatedDataVariantsPtr data_,
SharedDataPtr shared_data_,
Int32 source_number_,
Arena * arena_)
: ISource(params_->getHeader())
, params(std::move(params_))
, data(std::move(data_))
, shared_data(std::move(shared_data_))
, source_number(source_number_)
, arena(arena_)
{}
@ -116,7 +128,7 @@ protected:
Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num, &shared_data->is_cancelled);
Chunk chunk = convertToChunk(block);
shared_data->source_for_bucket[bucket_num] = source_number;
shared_data->is_bucket_processed[bucket_num] = true;
return chunk;
}
@ -125,7 +137,6 @@ private:
AggregatingTransformParamsPtr params;
ManyAggregatedDataVariantsPtr data;
SharedDataPtr shared_data;
Int32 source_number;
Arena * arena;
};
@ -249,16 +260,23 @@ private:
{
auto & output = outputs.front();
Int32 next_input_num = shared_data->source_for_bucket[current_bucket_num];
if (next_input_num < 0)
for (auto & input : inputs)
{
if (!input.isFinished() && input.hasData())
{
auto chunk = input.pull();
auto bucket = getInfoFromChunk(chunk)->bucket_num;
chunks[bucket] = std::move(chunk);
}
}
if (!shared_data->is_bucket_processed[current_bucket_num])
return Status::NeedData;
auto next_input = std::next(inputs.begin(), next_input_num);
/// next_input can't be finished till data was not pulled.
if (!next_input->hasData())
if (!chunks[current_bucket_num])
return Status::NeedData;
output.push(next_input->pull());
output.push(std::move(chunks[current_bucket_num]));
++current_bucket_num;
if (current_bucket_num == NUM_BUCKETS)
@ -286,6 +304,7 @@ private:
UInt32 current_bucket_num = 0;
static constexpr Int32 NUM_BUCKETS = 256;
std::array<Chunk, NUM_BUCKETS> chunks;
Processors processors;
@ -359,7 +378,7 @@ private:
{
Arena * arena = first->aggregates_pools.at(thread).get();
auto source = std::make_shared<ConvertingAggregatedToChunksSource>(
params, data, shared_data, thread, arena);
params, data, shared_data, arena);
processors.emplace_back(std::move(source));
}