dbms: added checData method in TinyLog storage [#METR-11709]

This commit is contained in:
Pavel Kartavyy 2014-07-31 17:39:23 +04:00
parent d1e995c7e0
commit 1a39b3bf5c
3 changed files with 43 additions and 1 deletions

View File

@ -248,6 +248,9 @@ public:
/// Поддерживается ли индекс в секции IN
virtual bool supportsIndexForIn() const { return false; };
/// проверяет валидность данных
virtual bool checkData() const { throw DB::Exception("Check query is not supported for " + getName() + " storage"); }
protected:
IStorage() : is_dropped(false) {}

View File

@ -12,6 +12,7 @@
#include <DB/Storages/IStorage.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <Poco/Util/XMLConfiguration.h>
namespace DB
@ -129,6 +130,7 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name);
bool checkData() const override;
private:
String path;
String name;
@ -144,9 +146,17 @@ private:
typedef std::map<String, ColumnData> Files_t;
Files_t files;
/// хранит размеры всех столбцов, чтобы проверять не побились ли они
using SizeFile = Poco::AutoPtr<Poco::Util::XMLConfiguration>;
SizeFile size_file;
Logger * log;
StorageTinyLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach, size_t max_compress_block_size_);
void addFile(const String & column_name, const IDataType & type, size_t level = 0);
void updateSize(const std::string & column_name);
};
}

View File

@ -264,7 +264,10 @@ void TinyLogBlockOutputStream::writeSuffix()
{
/// Заканчиваем запись.
for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
it->second->finalize();
storage.updateSize(it->first);
}
streams.clear();
}
@ -286,7 +289,9 @@ void TinyLogBlockOutputStream::write(const Block & block)
StorageTinyLog::StorageTinyLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach, size_t max_compress_block_size_)
: path(path_), name(name_), columns(columns_), max_compress_block_size(max_compress_block_size_)
: path(path_), name(name_), columns(columns_),
max_compress_block_size(max_compress_block_size_), size_file(new Poco::Util::XMLConfiguration(path + "sizes.txt")),
log(&Logger::get("StorageTinyLog"))
{
if (columns->empty())
throw Exception("Empty list of columns passed to StorageTinyLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
@ -394,4 +399,28 @@ void StorageTinyLog::drop()
it->second.data_file.remove();
}
bool StorageTinyLog::checkData() const
{
bool size_is_wrong = false;
for (auto & pair : files)
{
auto & file = pair.second.data_file;
size_t expected_size = std::stoull(size_file->getString(pair.first + ".size"));
size_t real_size = file.getSize();
if (real_size != expected_size)
{
LOG_ERROR(log, "Size of " << file.path() << "is wrong. Size is " << real_size << " but should be " << expected_size);
size_is_wrong = true;
}
}
return size_is_wrong;
}
void StorageTinyLog::updateSize(const std::string & column_name)
{
auto & file = files[column_name].data_file;
size_file->setString(column_name + ".size", std::to_string(file.getSize()));
}
}