diff --git a/dbms/include/DB/Storages/StorageLog.h b/dbms/include/DB/Storages/StorageLog.h index 11883901d76..3263b6e5c94 100644 --- a/dbms/include/DB/Storages/StorageLog.h +++ b/dbms/include/DB/Storages/StorageLog.h @@ -88,6 +88,7 @@ class LogBlockOutputStream : public IBlockOutputStream public: LogBlockOutputStream(StoragePtr owned_storage); void write(const Block & block); + void writeSuffix(); private: StorageLog & storage; Poco::ScopedWriteRWLock lock; @@ -105,6 +106,12 @@ private: CompressedWriteBuffer compressed; size_t plain_offset; /// Сколько байт было в файле на момент создания LogBlockOutputStream. + + void sync() + { + compressed.next(); + plain.sync(); + } }; typedef std::vector > MarksForColumns; diff --git a/dbms/include/DB/Storages/StorageTinyLog.h b/dbms/include/DB/Storages/StorageTinyLog.h index 376162f79c4..46a2c65686a 100644 --- a/dbms/include/DB/Storages/StorageTinyLog.h +++ b/dbms/include/DB/Storages/StorageTinyLog.h @@ -70,6 +70,7 @@ class TinyLogBlockOutputStream : public IBlockOutputStream public: TinyLogBlockOutputStream(StoragePtr owned_storage); void write(const Block & block); + void writeSuffix(); private: StorageTinyLog & storage; @@ -83,6 +84,12 @@ private: WriteBufferFromFile plain; CompressedWriteBuffer compressed; + + void sync() + { + compressed.next(); + plain.sync(); + } }; typedef std::map > FileStreams; diff --git a/dbms/src/Storages/StorageChunks.cpp b/dbms/src/Storages/StorageChunks.cpp index 49bdb9c4728..2c9360738f7 100644 --- a/dbms/src/Storages/StorageChunks.cpp +++ b/dbms/src/Storages/StorageChunks.cpp @@ -144,6 +144,7 @@ void StorageChunks::appendChunkToIndex(const std::string & chunk_name, size_t ma WriteBufferFromFile index(index_path, 4096, O_APPEND | O_CREAT | O_WRONLY); writeStringBinary(chunk_name, index); writeIntBinary(mark, index); + index.sync(); } void StorageChunks::dropThis() diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index edca39d3853..a9c6c5dfaa1 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -216,6 +216,18 @@ void LogBlockOutputStream::write(const Block & block) } +void LogBlockOutputStream::writeSuffix() +{ + /// Заканчиваем запись. + marks_stream.sync(); + + for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it) + it->second->sync(); + + streams.clear(); +} + + void LogBlockOutputStream::addStream(const String & name, const IDataType & type, size_t level) { /// Для массивов используются отдельные потоки для размеров. diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index b8ca636ef08..82a7b1eb085 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -245,6 +245,16 @@ void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & } +void TinyLogBlockOutputStream::writeSuffix() +{ + /// Заканчиваем запись. + for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it) + it->second->sync(); + + streams.clear(); +} + + void TinyLogBlockOutputStream::write(const Block & block) { storage.check(block, true);