dbms: fixed error with StorageBuffer [#METR-17889].

This commit is contained in:
Alexey Milovidov 2015-09-02 00:48:38 +03:00
parent 82595cb39e
commit dab47ec36d

View File

@ -149,6 +149,9 @@ static void appendBlock(const Block & from, Block & to)
throw Exception("Cannot append block to another: different type of columns at index " + toString(column_no)
+ ". Block 1: " + from.dumpStructure() + ". Block 2: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE);
if (col_to.empty())
to.getByPosition(column_no).column = col_from.clone();
else
for (size_t row_no = 0; row_no < rows; ++row_no)
col_to.insertFrom(col_from, row_no);
}
@ -243,23 +246,17 @@ private:
buffer.data = sorted_block.cloneEmpty();
}
/// Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер.
/** Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер.
* Это также защищает от неограниченного потребления оперативки, так как в случае невозможности записать в таблицу,
* будет выкинуто исключение, а новые данные не будут добавлены в буфер.
*/
if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes()))
{
/// Вытащим из буфера блок, заменим буфер на пустой. После этого можно разблокировать mutex.
Block block_to_write;
buffer.data.swap(block_to_write);
buffer.first_write_time = 0;
lock.unlock();
storage.flushBuffer(buffer, false);
lock.lock();
}
if (!storage.no_destination)
{
auto destination = storage.context.tryGetTable(storage.destination_database, storage.destination_table);
appendBlock(sorted_block, block_to_write);
storage.writeBlockToDestination(block_to_write, destination);
}
}
else
appendBlock(sorted_block, buffer.data);
}
};
@ -331,8 +328,16 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
{
std::lock_guard<std::mutex> lock(buffer.mutex);
if (check_thresholds && !checkThresholds(buffer, current_time))
if (check_thresholds)
{
if (!checkThresholds(buffer, current_time))
return;
}
else
{
if (buffer.data.rowsInFirstColumn() == 0)
return;
}
buffer.data.swap(block_to_write);
buffer.first_write_time = 0;
@ -357,10 +362,11 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
* Замечание: остаётся проблема - из-за того, что в разных попытках вставляются разные блоки,
* теряется идемпотентность вставки в ReplicatedMergeTree.
*/
appendBlock(block_to_write, buffer.data);
buffer.data.swap(block_to_write);
appendBlock(buffer.data, block_to_write);
}
buffer.data.swap(block_to_write);
if (!buffer.first_write_time)
buffer.first_write_time = current_time;