Simplify different block sturcture (i.e. after ALTER) support for Buffer

v2: fix empty block in case of flush

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2021-12-30 15:24:50 +03:00
parent 1fc29d704d
commit 9948525816
2 changed files with 19 additions and 40 deletions

View File

@ -452,14 +452,12 @@ static void appendBlock(const Block & from, Block & to)
if (!to) if (!to)
throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR);
if (to.rows()) assertBlocksHaveEqualStructure(from, to, "Buffer");
assertBlocksHaveEqualStructure(from, to, "Buffer");
from.checkNumberOfRows(); from.checkNumberOfRows();
to.checkNumberOfRows(); to.checkNumberOfRows();
size_t rows = from.rows(); size_t rows = from.rows();
size_t bytes = from.bytes();
size_t old_rows = to.rows(); size_t old_rows = to.rows();
size_t old_bytes = to.bytes(); size_t old_bytes = to.bytes();
@ -469,26 +467,17 @@ static void appendBlock(const Block & from, Block & to)
{ {
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker; MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
if (to.rows() == 0) for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
{ {
to = from; const IColumn & col_from = *from.getByPosition(column_no).column.get();
CurrentMetrics::add(CurrentMetrics::StorageBufferRows, rows); last_col = IColumn::mutate(std::move(to.getByPosition(column_no).column));
CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, bytes);
}
else
{
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
{
const IColumn & col_from = *from.getByPosition(column_no).column.get();
last_col = IColumn::mutate(std::move(to.getByPosition(column_no).column));
last_col->insertRangeFrom(col_from, 0, rows); last_col->insertRangeFrom(col_from, 0, rows);
to.getByPosition(column_no).column = std::move(last_col); to.getByPosition(column_no).column = std::move(last_col);
}
CurrentMetrics::add(CurrentMetrics::StorageBufferRows, rows);
CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, to.bytes() - old_bytes);
} }
CurrentMetrics::add(CurrentMetrics::StorageBufferRows, rows);
CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, to.bytes() - old_bytes);
} }
catch (...) catch (...)
{ {
@ -640,7 +629,8 @@ private:
* an exception will be thrown, and new data will not be added to the buffer. * an exception will be thrown, and new data will not be added to the buffer.
*/ */
storage.flushBuffer(buffer, false /* check_thresholds */, true /* locked */); if (storage.flushBuffer(buffer, false /* check_thresholds */, true /* locked */))
buffer.data = sorted_block.cloneEmpty();
} }
if (!buffer.first_write_time) if (!buffer.first_write_time)
@ -735,7 +725,7 @@ bool StorageBuffer::optimize(
if (deduplicate) if (deduplicate)
throw Exception("DEDUPLICATE cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED); throw Exception("DEDUPLICATE cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
flushAllBuffers(false, true); flushAllBuffers(false);
return true; return true;
} }
@ -813,14 +803,14 @@ bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes,
} }
void StorageBuffer::flushAllBuffers(bool check_thresholds, bool reset_blocks_structure) void StorageBuffer::flushAllBuffers(bool check_thresholds)
{ {
for (auto & buf : buffers) for (auto & buf : buffers)
flushBuffer(buf, check_thresholds, false, reset_blocks_structure); flushBuffer(buf, check_thresholds, false);
} }
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool locked, bool reset_block_structure) bool StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool locked)
{ {
Block block_to_write; Block block_to_write;
time_t current_time = time(nullptr); time_t current_time = time(nullptr);
@ -833,8 +823,6 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
if (!locked) if (!locked)
lock.emplace(buffer.lockForReading()); lock.emplace(buffer.lockForReading());
block_to_write = buffer.data.cloneEmpty();
rows = buffer.data.rows(); rows = buffer.data.rows();
bytes = buffer.data.bytes(); bytes = buffer.data.bytes();
if (buffer.first_write_time) if (buffer.first_write_time)
@ -843,12 +831,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
if (check_thresholds) if (check_thresholds)
{ {
if (!checkThresholdsImpl(/* direct= */false, rows, bytes, time_passed)) if (!checkThresholdsImpl(/* direct= */false, rows, bytes, time_passed))
return; return false;
}
else
{
if (rows == 0)
return;
} }
buffer.data.swap(block_to_write); buffer.data.swap(block_to_write);
@ -869,7 +852,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
total_writes.bytes -= block_allocated_bytes_delta; total_writes.bytes -= block_allocated_bytes_delta;
LOG_DEBUG(log, "Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)")); LOG_DEBUG(log, "Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)"));
return; return true;
} }
/** For simplicity, buffer is locked during write. /** For simplicity, buffer is locked during write.
@ -883,8 +866,6 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
try try
{ {
writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, getContext())); writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, getContext()));
if (reset_block_structure)
buffer.data.clear();
} }
catch (...) catch (...)
{ {
@ -909,6 +890,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
UInt64 milliseconds = watch.elapsedMilliseconds(); UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_DEBUG(log, "Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}.", rows, bytes, time_passed, milliseconds, (check_thresholds ? "(bg)" : "(direct)")); LOG_DEBUG(log, "Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}.", rows, bytes, time_passed, milliseconds, (check_thresholds ? "(bg)" : "(direct)"));
return true;
} }

View File

@ -153,11 +153,8 @@ private:
Poco::Logger * log; Poco::Logger * log;
void flushAllBuffers(bool check_thresholds = true, bool reset_blocks_structure = false); void flushAllBuffers(bool check_thresholds = true);
/// Reset the buffer. If check_thresholds is set - resets only if thresholds bool flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false);
/// are exceeded. If reset_block_structure is set - clears inner block
/// structure inside buffer (useful in OPTIMIZE and ALTER).
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false, bool reset_block_structure = false);
bool checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const; bool checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const; bool checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const;