mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-27 20:20:49 +00:00
cleanup
This commit is contained in:
parent
f8f79d5788
commit
574454c270
@ -97,7 +97,7 @@ void WindowTransform::advancePartitionEnd()
|
||||
const size_t n = partition_by_indices.size();
|
||||
if (n == 0)
|
||||
{
|
||||
fmt::print(stderr, "no partition by\n");
|
||||
// fmt::print(stderr, "no partition by\n");
|
||||
// No PARTITION BY. All input is one partition, which will end when the
|
||||
// input ends.
|
||||
partition_end = end;
|
||||
@ -107,17 +107,14 @@ void WindowTransform::advancePartitionEnd()
|
||||
// The partition ends when the PARTITION BY columns change. We need an array
|
||||
// of reference columns for comparison. We might have already dropped the
|
||||
// blocks where the partition starts, but any row in the partition will do.
|
||||
// We can't use group_start or frame_start, because we might have advanced
|
||||
// them to be equal to the partition_end.
|
||||
// Use the row previous to partition_end -- it should be valid.
|
||||
// FIXME group_start is now valid;
|
||||
//auto reference_row = partition_end;
|
||||
//retreatRowNumber(partition_end);
|
||||
// Use group_start -- it's always in the valid region, because it points to
|
||||
// the start of the current group, which we haven't fully processed yet, and
|
||||
// hence cannot drop.
|
||||
auto reference_row = group_start;
|
||||
// assert(reference_row < partition_end);
|
||||
if (reference_row == partition_end)
|
||||
{
|
||||
// This is for the very first partition. Try to get rid of it.
|
||||
// This is for the very first partition and its first row. Try to get
|
||||
// rid of this logic.
|
||||
advanceRowNumber(partition_end);
|
||||
}
|
||||
assert(reference_row < blocksEnd());
|
||||
@ -128,13 +125,13 @@ void WindowTransform::advancePartitionEnd()
|
||||
reference_partition_by.push_back(inputAt(reference_row)[i]);
|
||||
}
|
||||
|
||||
fmt::print(stderr, "{} cols to compare, reference at {}\n", n, group_start);
|
||||
// fmt::print(stderr, "{} cols to compare, reference at {}\n", n, group_start);
|
||||
|
||||
for ( ; partition_end < end; advanceRowNumber(partition_end))
|
||||
for (; partition_end < end; advanceRowNumber(partition_end))
|
||||
{
|
||||
// Check for partition end.
|
||||
size_t i = 0;
|
||||
for ( ; i < n; i++)
|
||||
for (; i < n; i++)
|
||||
{
|
||||
const auto * c = inputAt(partition_end)[partition_by_indices[i]].get();
|
||||
if (c->compareAt(partition_end.row,
|
||||
@ -174,7 +171,7 @@ void WindowTransform::advanceGroupEnd()
|
||||
advanceGroupEndRows();
|
||||
break;
|
||||
case WindowFrame::FrameType::Range:
|
||||
advanceGroupEndRange();
|
||||
assert(false);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -182,45 +179,14 @@ void WindowTransform::advanceGroupEnd()
|
||||
void WindowTransform::advanceGroupEndRows()
|
||||
{
|
||||
// ROWS mode, peer groups always contains only the current row.
|
||||
// if (group_end == partition_end)
|
||||
// {
|
||||
// // We might be already at the partition_end, if we got to it at the
|
||||
// // previous work() call, but didn't know the partition ended there (it
|
||||
// // was non-final end of data), and in the next work() call (now) we
|
||||
// // discovered that either:
|
||||
// // 1) we won't get more input, or
|
||||
// // 2) we got new data and the new partition really began at this point,
|
||||
// // which is the beginning of the block.
|
||||
// // Assert these conditions and do nothing.
|
||||
// assert(input_is_finished || partition_end.row == 0);
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// assert(group_end < partition_end);
|
||||
// advanceRowNumber(group_end);
|
||||
// group_ended = true;
|
||||
// }
|
||||
|
||||
assert(group_ended == false);
|
||||
// We cannot advance the groups if the group start is already beyond the
|
||||
// end of partition.
|
||||
if (group_start == partition_end)
|
||||
{
|
||||
// should it be an assertion?
|
||||
return;
|
||||
}
|
||||
|
||||
assert(group_start < partition_end);
|
||||
group_end = group_start;
|
||||
advanceRowNumber(group_end);
|
||||
group_ended = true;
|
||||
}
|
||||
|
||||
void WindowTransform::advanceGroupEndRange()
|
||||
{
|
||||
assert(false);
|
||||
}
|
||||
|
||||
void WindowTransform::advanceGroupEndGroups()
|
||||
{
|
||||
const size_t n = order_by_indices.size();
|
||||
@ -239,11 +205,11 @@ void WindowTransform::advanceGroupEndGroups()
|
||||
}
|
||||
|
||||
// `partition_end` is either end of partition or end of data.
|
||||
for ( ; group_end < partition_end; advanceRowNumber(group_end))
|
||||
for (; group_end < partition_end; advanceRowNumber(group_end))
|
||||
{
|
||||
// Check for group end.
|
||||
size_t i = 0;
|
||||
for ( ; i < n; i++)
|
||||
for (; i < n; i++)
|
||||
{
|
||||
const auto * c = inputAt(partition_end)[partition_by_indices[i]].get();
|
||||
if (c->compareAt(group_end.row,
|
||||
@ -342,7 +308,7 @@ void WindowTransform::advanceFrameEnd()
|
||||
const auto end = ((r.block + 1) == past_the_end_block)
|
||||
? past_the_end_row
|
||||
: block.numRows();
|
||||
for ( ; r.row < end; ++r.row)
|
||||
for (; r.row < end; ++r.row)
|
||||
{
|
||||
a->add(buf,
|
||||
argument_columns.data(),
|
||||
@ -355,8 +321,8 @@ void WindowTransform::advanceFrameEnd()
|
||||
|
||||
void WindowTransform::writeOutGroup()
|
||||
{
|
||||
fmt::print(stderr, "write out group [{}..{})\n",
|
||||
group_start, group_end);
|
||||
// fmt::print(stderr, "write out group [{}..{})\n",
|
||||
// group_start, group_end);
|
||||
|
||||
// Empty groups don't make sense.
|
||||
assert(group_start < group_end);
|
||||
@ -401,7 +367,7 @@ void WindowTransform::writeOutGroup()
|
||||
const auto end = ((r.block + 1) == past_the_end_block)
|
||||
? past_the_end_row
|
||||
: block.numRows();
|
||||
for ( ; r.row < end; ++r.row)
|
||||
for (; r.row < end; ++r.row)
|
||||
{
|
||||
// FIXME does it also allocate the result on the arena?
|
||||
// We'll have to pass it out with blocks then...
|
||||
@ -417,8 +383,8 @@ void WindowTransform::writeOutGroup()
|
||||
|
||||
void WindowTransform::appendChunk(Chunk & chunk)
|
||||
{
|
||||
fmt::print(stderr, "new chunk, {} rows, finished={}\n", chunk.getNumRows(),
|
||||
input_is_finished);
|
||||
// fmt::print(stderr, "new chunk, {} rows, finished={}\n", chunk.getNumRows(),
|
||||
// input_is_finished);
|
||||
|
||||
// First, prepare the new input block and add it to the queue. We might not
|
||||
// have it if it's end of data, though.
|
||||
@ -457,8 +423,8 @@ void WindowTransform::appendChunk(Chunk & chunk)
|
||||
assert(input_is_finished);
|
||||
}
|
||||
|
||||
fmt::print(stderr, "partition end '{}', {}\n", partition_end,
|
||||
partition_ended);
|
||||
// fmt::print(stderr, "partition end '{}', {}\n", partition_end,
|
||||
// partition_ended);
|
||||
|
||||
// After that, advance the peer groups. We can advance peer groups until
|
||||
// the end of partition or current end of data, which is precisely the
|
||||
@ -468,7 +434,7 @@ void WindowTransform::appendChunk(Chunk & chunk)
|
||||
group_start = group_end;
|
||||
advanceGroupEnd();
|
||||
|
||||
fmt::print(stderr, "group end '{}'\n", group_end);
|
||||
// fmt::print(stderr, "group end '{}'\n", group_end);
|
||||
|
||||
// If the group didn't end yet, wait.
|
||||
if (!group_ended)
|
||||
@ -535,8 +501,8 @@ void WindowTransform::appendChunk(Chunk & chunk)
|
||||
// The group pointers are already reset to the partition start, see the
|
||||
// above loop.
|
||||
|
||||
fmt::print(stderr, "reinitialize agg data at start of {}\n",
|
||||
new_partition_start);
|
||||
// fmt::print(stderr, "reinitialize agg data at start of {}\n",
|
||||
// new_partition_start);
|
||||
// Reinitialize the aggregate function states because the new partition
|
||||
// has started.
|
||||
for (auto & ws : workspaces)
|
||||
@ -569,9 +535,9 @@ void WindowTransform::appendChunk(Chunk & chunk)
|
||||
|
||||
IProcessor::Status WindowTransform::prepare()
|
||||
{
|
||||
fmt::print(stderr, "prepare, next output {}, not ready row {}, first block {}, hold {} blocks\n",
|
||||
next_output_block_number, first_not_ready_row, first_block_number,
|
||||
blocks.size());
|
||||
// fmt::print(stderr, "prepare, next output {}, not ready row {}, first block {}, hold {} blocks\n",
|
||||
// next_output_block_number, first_not_ready_row, first_block_number,
|
||||
// blocks.size());
|
||||
|
||||
if (output.isFinished())
|
||||
{
|
||||
@ -599,7 +565,7 @@ IProcessor::Status WindowTransform::prepare()
|
||||
if (output.canPush())
|
||||
{
|
||||
// Output the ready block.
|
||||
fmt::print(stderr, "output block {}\n", next_output_block_number);
|
||||
// fmt::print(stderr, "output block {}\n", next_output_block_number);
|
||||
const auto i = next_output_block_number - first_block_number;
|
||||
++next_output_block_number;
|
||||
auto & block = blocks[i];
|
||||
@ -694,18 +660,12 @@ void WindowTransform::work()
|
||||
// We can drop the old blocks if we already returned them as output, and the
|
||||
// frame and group are already past them. Note that the frame start can be
|
||||
// further than group start for some frame specs, so we have to check both.
|
||||
// Both pointers can also be at the end of partition, but we need at least
|
||||
// one row before that, so that we can use it as an etalon for finding the
|
||||
// partition boundaries, hence the "-1", and the weird std::max(1, ...)
|
||||
// wrapper is to avoid unsigned overflow.
|
||||
// FIXME the above "-1" is not needed anymore, I changed how we advance the
|
||||
// group_start
|
||||
const auto first_used_block = std::min(next_output_block_number,
|
||||
std::max(1ul, std::min(frame_start.block, group_start.block)) - 1);
|
||||
std::min(frame_start.block, group_start.block));
|
||||
if (first_block_number < first_used_block)
|
||||
{
|
||||
fmt::print(stderr, "will drop blocks from {} to {}\n", first_block_number,
|
||||
first_used_block);
|
||||
// fmt::print(stderr, "will drop blocks from {} to {}\n", first_block_number,
|
||||
// first_used_block);
|
||||
|
||||
blocks.erase(blocks.begin(),
|
||||
blocks.begin() + first_used_block - first_block_number);
|
||||
|
@ -39,32 +39,10 @@ struct WindowTransformBlock
|
||||
size_t numRows() const { return input_columns[0]->size(); }
|
||||
};
|
||||
|
||||
/*
|
||||
// Use half the range of the unsigned int data type, to allow wraparound and
|
||||
// comparison. I.e. even when the counter overflows we can still tell that it is
|
||||
// greater than another counter, unless they are more than half the range apart.
|
||||
template <typename T>
|
||||
struct Wraparound
|
||||
{
|
||||
T value;
|
||||
|
||||
// exclusive?
|
||||
constexpr auto max_value = T(1) << (sizeof(T) * 8 - 1);
|
||||
|
||||
operator T() const { return value; }
|
||||
operator T&() { return value; }
|
||||
bool operator == (const T & other) { return other.value = value; }
|
||||
Wraparound & operator ++ () { value++; return *this; }
|
||||
bool operator < (const T & other) { return value % max_value < other.value % max_value; }
|
||||
Wraparound & operator + (const T & other) { value = value + other.value; return *this; }
|
||||
};
|
||||
*/
|
||||
|
||||
|
||||
struct RowNumber
|
||||
{
|
||||
uint64_t block = 0;
|
||||
uint16_t row = 0;
|
||||
uint64_t row = 0;
|
||||
|
||||
bool operator < (const RowNumber & other) const
|
||||
{
|
||||
@ -155,7 +133,7 @@ private:
|
||||
assert(x.block >= first_block_number);
|
||||
assert(x.block - first_block_number < blocks.size());
|
||||
|
||||
const int block_rows = inputAt(x)[0]->size();
|
||||
const auto block_rows = inputAt(x)[0]->size();
|
||||
assert(x.row < block_rows);
|
||||
|
||||
x.row++;
|
||||
|
Loading…
Reference in New Issue
Block a user