diff --git a/src/Processors/Transforms/WindowTransform.cpp b/src/Processors/Transforms/WindowTransform.cpp index 1bbbfc3d021..c893af42ec9 100644 --- a/src/Processors/Transforms/WindowTransform.cpp +++ b/src/Processors/Transforms/WindowTransform.cpp @@ -97,7 +97,7 @@ void WindowTransform::advancePartitionEnd() const size_t n = partition_by_indices.size(); if (n == 0) { - fmt::print(stderr, "no partition by\n"); +// fmt::print(stderr, "no partition by\n"); // No PARTITION BY. All input is one partition, which will end when the // input ends. partition_end = end; @@ -107,17 +107,14 @@ void WindowTransform::advancePartitionEnd() // The partition ends when the PARTITION BY columns change. We need an array // of reference columns for comparison. We might have already dropped the // blocks where the partition starts, but any row in the partition will do. - // We can't use group_start or frame_start, because we might have advanced - // them to be equal to the partition_end. - // Use the row previous to partition_end -- it should be valid. - // FIXME group_start is now valid; - //auto reference_row = partition_end; - //retreatRowNumber(partition_end); + // Use group_start -- it's always in the valid region, because it points to + // the start of the current group, which we haven't fully processed yet, and + // hence cannot drop. auto reference_row = group_start; - // assert(reference_row < partition_end); if (reference_row == partition_end) { - // This is for the very first partition. Try to get rid of it. + // This is for the very first partition and its first row. Try to get + // rid of this logic. advanceRowNumber(partition_end); } assert(reference_row < blocksEnd()); @@ -128,13 +125,13 @@ void WindowTransform::advancePartitionEnd() reference_partition_by.push_back(inputAt(reference_row)[i]); } - fmt::print(stderr, "{} cols to compare, reference at {}\n", n, group_start); +// fmt::print(stderr, "{} cols to compare, reference at {}\n", n, group_start); - for ( ; partition_end < end; advanceRowNumber(partition_end)) + for (; partition_end < end; advanceRowNumber(partition_end)) { // Check for partition end. size_t i = 0; - for ( ; i < n; i++) + for (; i < n; i++) { const auto * c = inputAt(partition_end)[partition_by_indices[i]].get(); if (c->compareAt(partition_end.row, @@ -174,7 +171,7 @@ void WindowTransform::advanceGroupEnd() advanceGroupEndRows(); break; case WindowFrame::FrameType::Range: - advanceGroupEndRange(); + assert(false); break; } } @@ -182,45 +179,14 @@ void WindowTransform::advanceGroupEnd() void WindowTransform::advanceGroupEndRows() { // ROWS mode, peer groups always contains only the current row. -// if (group_end == partition_end) -// { -// // We might be already at the partition_end, if we got to it at the -// // previous work() call, but didn't know the partition ended there (it -// // was non-final end of data), and in the next work() call (now) we -// // discovered that either: -// // 1) we won't get more input, or -// // 2) we got new data and the new partition really began at this point, -// // which is the beginning of the block. -// // Assert these conditions and do nothing. -// assert(input_is_finished || partition_end.row == 0); -// } -// else -// { -// assert(group_end < partition_end); -// advanceRowNumber(group_end); -// group_ended = true; -// } - - assert(group_ended == false); // We cannot advance the groups if the group start is already beyond the // end of partition. - if (group_start == partition_end) - { - // should it be an assertion? - return; - } - assert(group_start < partition_end); group_end = group_start; advanceRowNumber(group_end); group_ended = true; } -void WindowTransform::advanceGroupEndRange() -{ - assert(false); -} - void WindowTransform::advanceGroupEndGroups() { const size_t n = order_by_indices.size(); @@ -239,11 +205,11 @@ void WindowTransform::advanceGroupEndGroups() } // `partition_end` is either end of partition or end of data. - for ( ; group_end < partition_end; advanceRowNumber(group_end)) + for (; group_end < partition_end; advanceRowNumber(group_end)) { // Check for group end. size_t i = 0; - for ( ; i < n; i++) + for (; i < n; i++) { const auto * c = inputAt(partition_end)[partition_by_indices[i]].get(); if (c->compareAt(group_end.row, @@ -342,7 +308,7 @@ void WindowTransform::advanceFrameEnd() const auto end = ((r.block + 1) == past_the_end_block) ? past_the_end_row : block.numRows(); - for ( ; r.row < end; ++r.row) + for (; r.row < end; ++r.row) { a->add(buf, argument_columns.data(), @@ -355,8 +321,8 @@ void WindowTransform::advanceFrameEnd() void WindowTransform::writeOutGroup() { - fmt::print(stderr, "write out group [{}..{})\n", - group_start, group_end); +// fmt::print(stderr, "write out group [{}..{})\n", +// group_start, group_end); // Empty groups don't make sense. assert(group_start < group_end); @@ -401,7 +367,7 @@ void WindowTransform::writeOutGroup() const auto end = ((r.block + 1) == past_the_end_block) ? past_the_end_row : block.numRows(); - for ( ; r.row < end; ++r.row) + for (; r.row < end; ++r.row) { // FIXME does it also allocate the result on the arena? // We'll have to pass it out with blocks then... @@ -417,8 +383,8 @@ void WindowTransform::writeOutGroup() void WindowTransform::appendChunk(Chunk & chunk) { - fmt::print(stderr, "new chunk, {} rows, finished={}\n", chunk.getNumRows(), - input_is_finished); +// fmt::print(stderr, "new chunk, {} rows, finished={}\n", chunk.getNumRows(), +// input_is_finished); // First, prepare the new input block and add it to the queue. We might not // have it if it's end of data, though. @@ -457,8 +423,8 @@ void WindowTransform::appendChunk(Chunk & chunk) assert(input_is_finished); } - fmt::print(stderr, "partition end '{}', {}\n", partition_end, - partition_ended); +// fmt::print(stderr, "partition end '{}', {}\n", partition_end, +// partition_ended); // After that, advance the peer groups. We can advance peer groups until // the end of partition or current end of data, which is precisely the @@ -468,7 +434,7 @@ void WindowTransform::appendChunk(Chunk & chunk) group_start = group_end; advanceGroupEnd(); - fmt::print(stderr, "group end '{}'\n", group_end); +// fmt::print(stderr, "group end '{}'\n", group_end); // If the group didn't end yet, wait. if (!group_ended) @@ -535,8 +501,8 @@ void WindowTransform::appendChunk(Chunk & chunk) // The group pointers are already reset to the partition start, see the // above loop. - fmt::print(stderr, "reinitialize agg data at start of {}\n", - new_partition_start); +// fmt::print(stderr, "reinitialize agg data at start of {}\n", +// new_partition_start); // Reinitialize the aggregate function states because the new partition // has started. for (auto & ws : workspaces) @@ -569,9 +535,9 @@ void WindowTransform::appendChunk(Chunk & chunk) IProcessor::Status WindowTransform::prepare() { - fmt::print(stderr, "prepare, next output {}, not ready row {}, first block {}, hold {} blocks\n", - next_output_block_number, first_not_ready_row, first_block_number, - blocks.size()); +// fmt::print(stderr, "prepare, next output {}, not ready row {}, first block {}, hold {} blocks\n", +// next_output_block_number, first_not_ready_row, first_block_number, +// blocks.size()); if (output.isFinished()) { @@ -599,7 +565,7 @@ IProcessor::Status WindowTransform::prepare() if (output.canPush()) { // Output the ready block. - fmt::print(stderr, "output block {}\n", next_output_block_number); +// fmt::print(stderr, "output block {}\n", next_output_block_number); const auto i = next_output_block_number - first_block_number; ++next_output_block_number; auto & block = blocks[i]; @@ -694,18 +660,12 @@ void WindowTransform::work() // We can drop the old blocks if we already returned them as output, and the // frame and group are already past them. Note that the frame start can be // further than group start for some frame specs, so we have to check both. - // Both pointers can also be at the end of partition, but we need at least - // one row before that, so that we can use it as an etalon for finding the - // partition boundaries, hence the "-1", and the weird std::max(1, ...) - // wrapper is to avoid unsigned overflow. - // FIXME the above "-1" is not needed anymore, I changed how we advance the - // group_start const auto first_used_block = std::min(next_output_block_number, - std::max(1ul, std::min(frame_start.block, group_start.block)) - 1); + std::min(frame_start.block, group_start.block)); if (first_block_number < first_used_block) { - fmt::print(stderr, "will drop blocks from {} to {}\n", first_block_number, - first_used_block); +// fmt::print(stderr, "will drop blocks from {} to {}\n", first_block_number, +// first_used_block); blocks.erase(blocks.begin(), blocks.begin() + first_used_block - first_block_number); diff --git a/src/Processors/Transforms/WindowTransform.h b/src/Processors/Transforms/WindowTransform.h index 8acece9fd17..d81914fe6f8 100644 --- a/src/Processors/Transforms/WindowTransform.h +++ b/src/Processors/Transforms/WindowTransform.h @@ -39,32 +39,10 @@ struct WindowTransformBlock size_t numRows() const { return input_columns[0]->size(); } }; -/* -// Use half the range of the unsigned int data type, to allow wraparound and -// comparison. I.e. even when the counter overflows we can still tell that it is -// greater than another counter, unless they are more than half the range apart. -template -struct Wraparound -{ - T value; - - // exclusive? - constexpr auto max_value = T(1) << (sizeof(T) * 8 - 1); - - operator T() const { return value; } - operator T&() { return value; } - bool operator == (const T & other) { return other.value = value; } - Wraparound & operator ++ () { value++; return *this; } - bool operator < (const T & other) { return value % max_value < other.value % max_value; } - Wraparound & operator + (const T & other) { value = value + other.value; return *this; } -}; -*/ - - struct RowNumber { uint64_t block = 0; - uint16_t row = 0; + uint64_t row = 0; bool operator < (const RowNumber & other) const { @@ -155,7 +133,7 @@ private: assert(x.block >= first_block_number); assert(x.block - first_block_number < blocks.size()); - const int block_rows = inputAt(x)[0]->size(); + const auto block_rows = inputAt(x)[0]->size(); assert(x.row < block_rows); x.row++;