From 1acad2acb11c75acae5a602ae38832d9c8cdb36f Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 15 Sep 2013 01:10:16 +0000 Subject: [PATCH] Merge --- dbms/include/DB/Core/ErrorCodes.h | 2 ++ .../DB/IO/WriteBufferFromFileDescriptor.h | 11 +++++++++ .../MergeTree/MergeTreeBlockOutputStream.h | 22 +++++++++++++++++ .../MergeTree/MergedBlockOutputStream.h | 24 +++++++++++++++---- 4 files changed, 55 insertions(+), 4 deletions(-) diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 4c6138f5ad1..3807e8232d7 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -208,6 +208,8 @@ namespace ErrorCodes QUOTA_DOESNT_ALLOW_KEYS, QUOTA_EXPIRED, TOO_MUCH_SIMULTANEOUS_QUERIES, + NO_FREE_CONNECTION, + CANNOT_FSYNC, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h b/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h index 995b990ea8c..616540108f0 100644 --- a/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h @@ -82,6 +82,17 @@ public: if (-1 == res) throwFromErrno("Cannot truncate file " + getFileName(), ErrorCodes::CANNOT_TRUNCATE_FILE); } + + void sync() + { + /// Если в буфере ещё остались данные - запишем их. + next(); + + /// Попросим ОС сбросить данные на диск. + int res = fsync(fd); + if (-1 == res) + throwFromErrno("Cannot fsync " + getFileName(), ErrorCodes::CANNOT_FSYNC); + } }; } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h index 91f1305bed8..c3693cb6811 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -1,7 +1,15 @@ #pragma once +#include +#include + +#include + +#include + #include + namespace DB { @@ -135,6 +143,8 @@ private: for (size_t i = 0; i < rows; i += storage.index_granularity) for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it) (*it)->type->serializeBinary((*(*it)->column)[i], index); + + index.sync(); } LOG_TRACE(storage.log, "Writing data."); @@ -208,6 +218,10 @@ private: type_arr->serializeOffsets(column, compressed, prev_mark, storage.index_granularity); prev_mark += storage.index_granularity; } + + compressed.next(); + plain.sync(); + marks.sync(); } } if (const DataTypeNested * type_nested = dynamic_cast(&type)) @@ -228,6 +242,10 @@ private: type_nested->serializeOffsets(column, compressed, prev_mark, storage.index_granularity); prev_mark += storage.index_granularity; } + + compressed.next(); + plain.sync(); + marks.sync(); } { @@ -244,6 +262,10 @@ private: type.serializeBinary(column, compressed, prev_mark, storage.index_granularity); prev_mark += storage.index_granularity; } + + compressed.next(); + plain.sync(); + marks.sync(); } } }; diff --git a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h index f905bd6c8b6..a4adad907b7 100644 --- a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h @@ -1,13 +1,17 @@ #pragma once +#include +#include + #include + namespace DB { - + /** Для записи куска, полученного слиянием нескольких других. - * Данные уже отсортированы, относятся к одному месяцу, и пишутся в один кускок. - */ + * Данные уже отсортированы, относятся к одному месяцу, и пишутся в один кускок. + */ class MergedBlockOutputStream : public IBlockOutputStream { public: @@ -71,7 +75,12 @@ public: void writeSuffix() { /// Заканчиваем запись. + index_stream->sync(); index_stream = NULL; + + for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it) + it->second->sync(); + column_streams.clear(); if (marks_count == 0) @@ -106,12 +115,19 @@ private: WriteBufferFromFile plain; CompressedWriteBuffer compressed; WriteBufferFromFile marks; + + void sync() + { + compressed.next(); + plain.sync(); + marks.sync(); + } }; typedef std::map > ColumnStreams; ColumnStreams column_streams; - SharedPtr index_stream; + SharedPtr index_stream; /// Смещение до первой строчки блока, для которой надо записать индекс. size_t index_offset;