dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-06-21 16:33:00 +00:00
parent 3482ab7686
commit 2961d42e9d
2 changed files with 25 additions and 5 deletions

View File

@ -137,6 +137,12 @@ private:
};
typedef std::map<String, ColumnData> Files_t;
Files_t files;
/** Прочитать файлы с засечками, если они ещё не прочитаны.
* Делается лениво, чтобы при большом количестве таблиц, сервер быстро стартовал.
*/
bool loaded_marks;
void loadMarks();
};
}

View File

@ -97,7 +97,7 @@ void LogBlockOutputStream::write(const Block & block)
StorageLog::StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_)
: path(path_), name(name_), columns(columns_)
: path(path_), name(name_), columns(columns_), loaded_marks(false)
{
if (columns->empty())
throw Exception("Empty list of columns passed to StorageLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
@ -105,7 +105,6 @@ StorageLog::StorageLog(const std::string & path_, const std::string & name_, Nam
/// создаём файлы, если их нет
Poco::File(path + escapeForFileName(name) + '/').createDirectories();
ssize_t size_of_marks_file = -1;
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it)
{
if (files.end() != files.find(it->first))
@ -116,7 +115,18 @@ StorageLog::StorageLog(const std::string & path_, const std::string & name_, Nam
files.insert(std::make_pair(it->first, column_data));
files[it->first].data_file = Poco::File(path + escapeForFileName(name) + '/' + escapeForFileName(it->first) + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
files[it->first].marks_file = Poco::File(path + escapeForFileName(name) + '/' + escapeForFileName(it->first) + DBMS_STORAGE_LOG_MARKS_FILE_EXTENSION);
}
}
void StorageLog::loadMarks()
{
if (loaded_marks)
return;
ssize_t size_of_marks_file = -1;
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it)
{
/// Считаем засечки
if (files[it->first].marks_file.exists())
{
@ -124,14 +134,14 @@ StorageLog::StorageLog(const std::string & path_, const std::string & name_, Nam
if (size_of_current_marks_file % sizeof(Mark) != 0)
throw Exception("Sizes of marks files are inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
if (-1 == size_of_marks_file)
size_of_marks_file = size_of_current_marks_file;
else if (size_of_marks_file != size_of_current_marks_file)
throw Exception("Sizes of marks files are inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
files[it->first].marks.reserve(files[it->first].marks_file.getSize() / sizeof(Mark));
ReadBufferFromFile marks_rb(files[it->first].marks_file.path());
ReadBufferFromFile marks_rb(files[it->first].marks_file.path(), 32768);
while (!marks_rb.eof())
{
Mark mark;
@ -141,6 +151,8 @@ StorageLog::StorageLog(const std::string & path_, const std::string & name_, Nam
}
}
}
loaded_marks = true;
}
@ -167,6 +179,7 @@ BlockInputStreams StorageLog::read(
size_t max_block_size,
unsigned threads)
{
loadMarks();
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
@ -202,6 +215,7 @@ BlockInputStreams StorageLog::read(
BlockOutputStreamPtr StorageLog::write(
ASTPtr query)
{
loadMarks();
return new LogBlockOutputStream(*this);
}