diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index a327df0e6d6..269ab8a3a35 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -149,8 +149,11 @@ static void appendBlock(const Block & from, Block & to) throw Exception("Cannot append block to another: different type of columns at index " + toString(column_no) + ". Block 1: " + from.dumpStructure() + ". Block 2: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE); - for (size_t row_no = 0; row_no < rows; ++row_no) - col_to.insertFrom(col_from, row_no); + if (col_to.empty()) + to.getByPosition(column_no).column = col_from.clone(); + else + for (size_t row_no = 0; row_no < rows; ++row_no) + col_to.insertFrom(col_from, row_no); } } @@ -243,24 +246,18 @@ private: buffer.data = sorted_block.cloneEmpty(); } - /// Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер. + /** Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер. + * Это также защищает от неограниченного потребления оперативки, так как в случае невозможности записать в таблицу, + * будет выкинуто исключение, а новые данные не будут добавлены в буфер. + */ if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes())) { - /// Вытащим из буфера блок, заменим буфер на пустой. После этого можно разблокировать mutex. - Block block_to_write; - buffer.data.swap(block_to_write); - buffer.first_write_time = 0; lock.unlock(); - - if (!storage.no_destination) - { - auto destination = storage.context.tryGetTable(storage.destination_database, storage.destination_table); - appendBlock(sorted_block, block_to_write); - storage.writeBlockToDestination(block_to_write, destination); - } + storage.flushBuffer(buffer, false); + lock.lock(); } - else - appendBlock(sorted_block, buffer.data); + + appendBlock(sorted_block, buffer.data); } }; @@ -331,8 +328,16 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) { std::lock_guard lock(buffer.mutex); - if (check_thresholds && !checkThresholds(buffer, current_time)) - return; + if (check_thresholds) + { + if (!checkThresholds(buffer, current_time)) + return; + } + else + { + if (buffer.data.rowsInFirstColumn() == 0) + return; + } buffer.data.swap(block_to_write); buffer.first_write_time = 0; @@ -357,10 +362,11 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) * Замечание: остаётся проблема - из-за того, что в разных попытках вставляются разные блоки, * теряется идемпотентность вставки в ReplicatedMergeTree. */ - appendBlock(block_to_write, buffer.data); - buffer.data.swap(block_to_write); + appendBlock(buffer.data, block_to_write); } + buffer.data.swap(block_to_write); + if (!buffer.first_write_time) buffer.first_write_time = current_time;