This commit is contained in:
Alexey Milovidov 2013-09-15 01:10:16 +00:00
parent 9a515d7082
commit 1acad2acb1
4 changed files with 55 additions and 4 deletions

View File

@ -208,6 +208,8 @@ namespace ErrorCodes
QUOTA_DOESNT_ALLOW_KEYS,
QUOTA_EXPIRED,
TOO_MUCH_SIMULTANEOUS_QUERIES,
NO_FREE_CONNECTION,
CANNOT_FSYNC,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -82,6 +82,17 @@ public:
if (-1 == res)
throwFromErrno("Cannot truncate file " + getFileName(), ErrorCodes::CANNOT_TRUNCATE_FILE);
}
void sync()
{
/// Если в буфере ещё остались данные - запишем их.
next();
/// Попросим ОС сбросить данные на диск.
int res = fsync(fd);
if (-1 == res)
throwFromErrno("Cannot fsync " + getFileName(), ErrorCodes::CANNOT_FSYNC);
}
};
}

View File

@ -1,7 +1,15 @@
#pragma once
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Interpreters/sortBlock.h>
#include <DB/Storages/StorageMergeTree.h>
namespace DB
{
@ -135,6 +143,8 @@ private:
for (size_t i = 0; i < rows; i += storage.index_granularity)
for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it)
(*it)->type->serializeBinary((*(*it)->column)[i], index);
index.sync();
}
LOG_TRACE(storage.log, "Writing data.");
@ -208,6 +218,10 @@ private:
type_arr->serializeOffsets(column, compressed, prev_mark, storage.index_granularity);
prev_mark += storage.index_granularity;
}
compressed.next();
plain.sync();
marks.sync();
}
}
if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
@ -228,6 +242,10 @@ private:
type_nested->serializeOffsets(column, compressed, prev_mark, storage.index_granularity);
prev_mark += storage.index_granularity;
}
compressed.next();
plain.sync();
marks.sync();
}
{
@ -244,6 +262,10 @@ private:
type.serializeBinary(column, compressed, prev_mark, storage.index_granularity);
prev_mark += storage.index_granularity;
}
compressed.next();
plain.sync();
marks.sync();
}
}
};

View File

@ -1,13 +1,17 @@
#pragma once
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/Storages/StorageMergeTree.h>
namespace DB
{
/** Для записи куска, полученного слиянием нескольких других.
* Данные уже отсортированы, относятся к одному месяцу, и пишутся в один кускок.
*/
* Данные уже отсортированы, относятся к одному месяцу, и пишутся в один кускок.
*/
class MergedBlockOutputStream : public IBlockOutputStream
{
public:
@ -71,7 +75,12 @@ public:
void writeSuffix()
{
/// Заканчиваем запись.
index_stream->sync();
index_stream = NULL;
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
it->second->sync();
column_streams.clear();
if (marks_count == 0)
@ -106,12 +115,19 @@ private:
WriteBufferFromFile plain;
CompressedWriteBuffer compressed;
WriteBufferFromFile marks;
void sync()
{
compressed.next();
plain.sync();
marks.sync();
}
};
typedef std::map<String, SharedPtr<ColumnStream> > ColumnStreams;
ColumnStreams column_streams;
SharedPtr<WriteBuffer> index_stream;
SharedPtr<WriteBufferFromFile> index_stream;
/// Смещение до первой строчки блока, для которой надо записать индекс.
size_t index_offset;