mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 10:31:57 +00:00
partition by -- single loop
This commit is contained in:
parent
b8a2a29f94
commit
10a8831d8b
@ -79,6 +79,8 @@ void WindowTransform::advancePartitionEnd()
|
|||||||
|
|
||||||
const RowNumber end = blocksEnd();
|
const RowNumber end = blocksEnd();
|
||||||
|
|
||||||
|
fmt::print(stderr, "end {}, partition_end {}\n", end, partition_end);
|
||||||
|
|
||||||
// If we're at the total end of data, we must end the partition. This is the
|
// If we're at the total end of data, we must end the partition. This is the
|
||||||
// only place in calculations where we need special handling for end of data,
|
// only place in calculations where we need special handling for end of data,
|
||||||
// other places will work as usual based on `partition_ended` = true, because
|
// other places will work as usual based on `partition_ended` = true, because
|
||||||
@ -93,49 +95,48 @@ void WindowTransform::advancePartitionEnd()
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we got to the end of the block already, just stop.
|
||||||
|
if (partition_end == end)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We process one block at a time, but we can process each block many times,
|
||||||
|
// if it contains multiple partitions. The `partition_end` is a
|
||||||
|
// past-the-end pointer, so it must be already in the "next" block we haven't
|
||||||
|
// processed yet. This is also the last block we have.
|
||||||
|
// The exception to this rule is end of data, for which we checked above.
|
||||||
|
assert(end.block == partition_end.block + 1);
|
||||||
|
|
||||||
// Try to advance the partition end pointer.
|
// Try to advance the partition end pointer.
|
||||||
const size_t n = partition_by_indices.size();
|
const size_t n = partition_by_indices.size();
|
||||||
if (n == 0)
|
if (n == 0)
|
||||||
{
|
{
|
||||||
// fmt::print(stderr, "no partition by\n");
|
|
||||||
// No PARTITION BY. All input is one partition, which will end when the
|
// No PARTITION BY. All input is one partition, which will end when the
|
||||||
// input ends.
|
// input ends.
|
||||||
partition_end = end;
|
partition_end = end;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// The partition ends when the PARTITION BY columns change. We need an array
|
// Check for partition end.
|
||||||
// of reference columns for comparison. We might have already dropped the
|
// The partition ends when the PARTITION BY columns change. We need
|
||||||
// blocks where the partition starts, but any row in the partition will do.
|
// some reference columns for comparison. We might have already
|
||||||
// Use group_start -- it's always in the valid region, because it points to
|
// dropped the blocks where the partition starts, but any row in the
|
||||||
// the start of the current group, which we haven't fully processed yet, and
|
// partition will do. Use group_start -- it's always in the valid
|
||||||
// hence cannot drop.
|
// region, because it points to the start of the current group,
|
||||||
auto reference_row = group_start;
|
// which we haven't fully processed yet, and therefore cannot drop.
|
||||||
if (reference_row == partition_end)
|
// It might be the same as the partition_end if it's the first group of the
|
||||||
|
// first partition, so we compare it to itself, but it still works correctly.
|
||||||
|
const auto block_rows = blockRowsNumber(partition_end);
|
||||||
|
for (; partition_end.row < block_rows; ++partition_end.row)
|
||||||
{
|
{
|
||||||
// This is for the very first partition and its first row. Try to get
|
|
||||||
// rid of this logic.
|
|
||||||
advanceRowNumber(partition_end);
|
|
||||||
}
|
|
||||||
assert(reference_row < blocksEnd());
|
|
||||||
assert(reference_row.block >= first_block_number);
|
|
||||||
Columns reference_partition_by;
|
|
||||||
for (const auto i : partition_by_indices)
|
|
||||||
{
|
|
||||||
reference_partition_by.push_back(inputAt(reference_row)[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// fmt::print(stderr, "{} cols to compare, reference at {}\n", n, group_start);
|
|
||||||
|
|
||||||
for (; partition_end < end; advanceRowNumber(partition_end))
|
|
||||||
{
|
|
||||||
// Check for partition end.
|
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
for (; i < n; i++)
|
for (; i < n; i++)
|
||||||
{
|
{
|
||||||
|
const auto * ref = inputAt(group_start)[partition_by_indices[i]].get();
|
||||||
const auto * c = inputAt(partition_end)[partition_by_indices[i]].get();
|
const auto * c = inputAt(partition_end)[partition_by_indices[i]].get();
|
||||||
if (c->compareAt(partition_end.row,
|
if (c->compareAt(partition_end.row,
|
||||||
group_start.row, *reference_partition_by[i],
|
group_start.row, *ref,
|
||||||
1 /* nan_direction_hint */) != 0)
|
1 /* nan_direction_hint */) != 0)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
@ -144,13 +145,17 @@ void WindowTransform::advancePartitionEnd()
|
|||||||
|
|
||||||
if (i < n)
|
if (i < n)
|
||||||
{
|
{
|
||||||
// fmt::print(stderr, "col {} doesn't match at {}: ref {}, val {}\n",
|
|
||||||
// i, partition_end, inputAt(partition_end)[i]);
|
|
||||||
partition_ended = true;
|
partition_ended = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (partition_end.row == block_rows)
|
||||||
|
{
|
||||||
|
++partition_end.block;
|
||||||
|
partition_end.row = 0;
|
||||||
|
}
|
||||||
|
|
||||||
// Went until the end of data and didn't find the new partition.
|
// Went until the end of data and didn't find the new partition.
|
||||||
assert(!partition_ended && partition_end == blocksEnd());
|
assert(!partition_ended && partition_end == blocksEnd());
|
||||||
}
|
}
|
||||||
@ -198,12 +203,6 @@ void WindowTransform::advanceGroupEndGroups()
|
|||||||
group_ended = partition_ended;
|
group_ended = partition_ended;
|
||||||
}
|
}
|
||||||
|
|
||||||
Columns reference_order_by;
|
|
||||||
for (const auto i : order_by_indices)
|
|
||||||
{
|
|
||||||
reference_order_by.push_back(inputAt(group_start)[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// `partition_end` is either end of partition or end of data.
|
// `partition_end` is either end of partition or end of data.
|
||||||
for (; group_end < partition_end; advanceRowNumber(group_end))
|
for (; group_end < partition_end; advanceRowNumber(group_end))
|
||||||
{
|
{
|
||||||
@ -211,9 +210,9 @@ void WindowTransform::advanceGroupEndGroups()
|
|||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
for (; i < n; i++)
|
for (; i < n; i++)
|
||||||
{
|
{
|
||||||
const auto * c = inputAt(partition_end)[partition_by_indices[i]].get();
|
const auto * ref = inputAt(group_start)[order_by_indices[i]].get();
|
||||||
if (c->compareAt(group_end.row,
|
const auto * c = inputAt(group_end)[order_by_indices[i]].get();
|
||||||
group_start.row, *reference_order_by[i],
|
if (c->compareAt(group_end.row, group_start.row, *ref,
|
||||||
1 /* nan_direction_hint */) != 0)
|
1 /* nan_direction_hint */) != 0)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
@ -381,6 +380,10 @@ void WindowTransform::writeOutGroup()
|
|||||||
first_not_ready_row = group_end;
|
first_not_ready_row = group_end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void WindowTransform::initPerBlockCaches()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
void WindowTransform::appendChunk(Chunk & chunk)
|
void WindowTransform::appendChunk(Chunk & chunk)
|
||||||
{
|
{
|
||||||
// fmt::print(stderr, "new chunk, {} rows, finished={}\n", chunk.getNumRows(),
|
// fmt::print(stderr, "new chunk, {} rows, finished={}\n", chunk.getNumRows(),
|
||||||
@ -410,6 +413,8 @@ void WindowTransform::appendChunk(Chunk & chunk)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
initPerBlockCaches();
|
||||||
|
|
||||||
// Start the calculations. First, advance the partition end.
|
// Start the calculations. First, advance the partition end.
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
|
@ -110,6 +110,7 @@ private:
|
|||||||
void advanceFrameStart();
|
void advanceFrameStart();
|
||||||
void advanceFrameEnd();
|
void advanceFrameEnd();
|
||||||
void writeOutGroup();
|
void writeOutGroup();
|
||||||
|
void initPerBlockCaches();
|
||||||
|
|
||||||
Columns & inputAt(const RowNumber & x)
|
Columns & inputAt(const RowNumber & x)
|
||||||
{
|
{
|
||||||
@ -121,6 +122,11 @@ private:
|
|||||||
const Columns & inputAt(const RowNumber & x) const
|
const Columns & inputAt(const RowNumber & x) const
|
||||||
{ return const_cast<WindowTransform *>(this)->inputAt(x); }
|
{ return const_cast<WindowTransform *>(this)->inputAt(x); }
|
||||||
|
|
||||||
|
size_t blockRowsNumber(const RowNumber & x) const
|
||||||
|
{
|
||||||
|
return inputAt(x)[0]->size();
|
||||||
|
}
|
||||||
|
|
||||||
MutableColumns & outputAt(const RowNumber & x)
|
MutableColumns & outputAt(const RowNumber & x)
|
||||||
{
|
{
|
||||||
assert(x.block >= first_block_number);
|
assert(x.block >= first_block_number);
|
||||||
|
Loading…
Reference in New Issue
Block a user