mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
commit
b034146ba4
@ -8,10 +8,9 @@ namespace ErrorCodes
|
|||||||
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
|
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
|
||||||
}
|
}
|
||||||
|
|
||||||
SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_, bool reserve_memory_)
|
SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_)
|
||||||
: min_block_size_rows(min_block_size_rows_)
|
: min_block_size_rows(min_block_size_rows_)
|
||||||
, min_block_size_bytes(min_block_size_bytes_)
|
, min_block_size_bytes(min_block_size_bytes_)
|
||||||
, reserve_memory(reserve_memory_)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,13 +94,7 @@ void SquashingTransform::append(ReferenceType input_block)
|
|||||||
const auto source_column = input_block.getByPosition(i).column;
|
const auto source_column = input_block.getByPosition(i).column;
|
||||||
|
|
||||||
auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column));
|
auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column));
|
||||||
|
|
||||||
if (reserve_memory)
|
|
||||||
{
|
|
||||||
mutable_column->reserve(min_block_size_bytes);
|
|
||||||
}
|
|
||||||
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
|
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
|
||||||
|
|
||||||
accumulated_block.getByPosition(i).column = std::move(mutable_column);
|
accumulated_block.getByPosition(i).column = std::move(mutable_column);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ class SquashingTransform
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
/// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
|
/// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
|
||||||
SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_, bool reserve_memory_ = false);
|
SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_);
|
||||||
|
|
||||||
/** Add next block and possibly returns squashed block.
|
/** Add next block and possibly returns squashed block.
|
||||||
* At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true.
|
* At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true.
|
||||||
@ -34,7 +34,6 @@ public:
|
|||||||
private:
|
private:
|
||||||
size_t min_block_size_rows;
|
size_t min_block_size_rows;
|
||||||
size_t min_block_size_bytes;
|
size_t min_block_size_bytes;
|
||||||
bool reserve_memory;
|
|
||||||
|
|
||||||
Block accumulated_block;
|
Block accumulated_block;
|
||||||
|
|
||||||
|
@ -401,7 +401,6 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
|
|||||||
performance_counters.setParent(&ProfileEvents::global_counters);
|
performance_counters.setParent(&ProfileEvents::global_counters);
|
||||||
memory_tracker.reset();
|
memory_tracker.reset();
|
||||||
|
|
||||||
/// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below (will reset to its parent).
|
|
||||||
memory_tracker.setParent(thread_group->memory_tracker.getParent());
|
memory_tracker.setParent(thread_group->memory_tracker.getParent());
|
||||||
|
|
||||||
query_id.clear();
|
query_id.clear();
|
||||||
|
@ -8,15 +8,6 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
class AggregatedArenasChunkInfo : public ChunkInfo
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
Arenas arenas;
|
|
||||||
explicit AggregatedArenasChunkInfo(Arenas arenas_)
|
|
||||||
: arenas(std::move(arenas_))
|
|
||||||
{}
|
|
||||||
};
|
|
||||||
|
|
||||||
class AggregatedChunkInfo : public ChunkInfo
|
class AggregatedChunkInfo : public ChunkInfo
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
@ -5,9 +5,9 @@ namespace DB
|
|||||||
{
|
{
|
||||||
|
|
||||||
SquashingChunksTransform::SquashingChunksTransform(
|
SquashingChunksTransform::SquashingChunksTransform(
|
||||||
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory)
|
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
|
||||||
: ExceptionKeepingTransform(header, header, false)
|
: ExceptionKeepingTransform(header, header, false)
|
||||||
, squashing(min_block_size_rows, min_block_size_bytes, reserve_memory)
|
, squashing(min_block_size_rows, min_block_size_bytes)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ class SquashingChunksTransform : public ExceptionKeepingTransform
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit SquashingChunksTransform(
|
explicit SquashingChunksTransform(
|
||||||
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory = false);
|
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
|
||||||
|
|
||||||
String getName() const override { return "SquashingTransform"; }
|
String getName() const override { return "SquashingTransform"; }
|
||||||
|
|
||||||
|
@ -426,19 +426,7 @@ static QueryPipeline process(Block block, ViewRuntimeData & view, const ViewsDat
|
|||||||
std::move(block),
|
std::move(block),
|
||||||
views_data.source_storage->getVirtuals()));
|
views_data.source_storage->getVirtuals()));
|
||||||
|
|
||||||
/// We need keep InterpreterSelectQuery, until the processing will be finished, since:
|
|
||||||
///
|
|
||||||
/// - We copy Context inside InterpreterSelectQuery to support
|
|
||||||
/// modification of context (Settings) for subqueries
|
|
||||||
/// - InterpreterSelectQuery lives shorter than query pipeline.
|
|
||||||
/// It's used just to build the query pipeline and no longer needed
|
|
||||||
/// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery,
|
|
||||||
/// **can** take a reference to Context from InterpreterSelectQuery
|
|
||||||
/// (the problem raises only when function uses context from the
|
|
||||||
/// execute*() method, like FunctionDictGet do)
|
|
||||||
/// - These objects live inside query pipeline (DataStreams) and the reference become dangling.
|
|
||||||
InterpreterSelectQuery select(view.query, local_context, SelectQueryOptions());
|
InterpreterSelectQuery select(view.query, local_context, SelectQueryOptions());
|
||||||
|
|
||||||
auto pipeline = select.buildQueryPipeline();
|
auto pipeline = select.buildQueryPipeline();
|
||||||
pipeline.resize(1);
|
pipeline.resize(1);
|
||||||
|
|
||||||
|
@ -284,7 +284,7 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context)
|
|||||||
: QueryProcessingStage::WithMergeableState})
|
: QueryProcessingStage::WithMergeableState})
|
||||||
.buildQueryPipeline();
|
.buildQueryPipeline();
|
||||||
builder.resize(1);
|
builder.resize(1);
|
||||||
builder.addTransform(std::make_shared<SquashingChunksTransform>(builder.getHeader(), block.rows(), 0));
|
builder.addTransform(std::make_shared<SquashingChunksTransform>(builder.getHeader(), block.rows(), block.bytes()));
|
||||||
|
|
||||||
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||||
PullingPipelineExecutor executor(pipeline);
|
PullingPipelineExecutor executor(pipeline);
|
||||||
|
Loading…
Reference in New Issue
Block a user