This commit is contained in:
Evgeniy Gatov 2015-07-02 15:54:27 +03:00
commit 044860293c
9 changed files with 66 additions and 25 deletions

View File

@ -105,6 +105,7 @@ void readVectorBinary(std::vector<T> & v, ReadBuffer & buf, size_t MAX_VECTOR_SI
void assertString(const char * s, ReadBuffer & buf);
void assertEOF(ReadBuffer & buf);
void assertChar(char symbol, ReadBuffer & buf);
inline void assertString(const String & s, ReadBuffer & buf)
{

View File

@ -252,13 +252,19 @@ void writeAnyEscapedString(const String & s, WriteBuffer & buf)
}
inline void writeEscapedString(const String & s, WriteBuffer & buf)
inline void writeEscapedString(const char * str, size_t size, WriteBuffer & buf)
{
/// strpbrk в libc под Linux на процессорах с SSE 4.2 хорошо оптимизирована (этот if ускоряет код в 1.5 раза)
if (nullptr == strpbrk(s.data(), "\b\f\n\r\t\'\\") && strlen(s.data()) == s.size())
writeString(s, buf);
if (nullptr == strpbrk(str, "\b\f\n\r\t\'\\") && strlen(str) == size)
writeString(str, size, buf);
else
writeAnyEscapedString<'\''>(s, buf);
writeAnyEscapedString<'\''>(str, str + size, buf);
}
inline void writeEscapedString(const String & s, WriteBuffer & buf)
{
writeEscapedString(s.data(), s.size(), buf);
}
@ -470,6 +476,10 @@ inline void writeText(const Float32 & x, WriteBuffer & buf) { writeFloatText(x,
inline void writeText(const Float64 & x, WriteBuffer & buf) { writeFloatText(x, buf); }
inline void writeText(const String & x, WriteBuffer & buf) { writeEscapedString(x, buf); }
inline void writeText(const bool & x, WriteBuffer & buf) { writeBoolText(x, buf); }
/// в отличие от метода для std::string
/// здесь предполагается, что x null-terminated строка.
inline void writeText(const char * x, WriteBuffer & buf) { writeEscapedString(x, strlen(x), buf); }
inline void writeText(const char * x, size_t size, WriteBuffer & buf) { writeEscapedString(x, size, buf); }
inline void writeText(const VisitID_t & x, WriteBuffer & buf) { writeIntText(static_cast<const UInt64 &>(x), buf); }
inline void writeText(const mysqlxx::Date & x, WriteBuffer & buf) { writeDateText(x, buf); }

View File

@ -36,6 +36,16 @@ void assertString(const char * s, ReadBuffer & buf)
}
}
void assertChar(char symbol, ReadBuffer & buf)
{
if (buf.eof() || *buf.position() != symbol)
{
char err[2] = {symbol, '\0'};
throwAtAssertionFailed(err, buf);
}
++buf.position();
}
void assertEOF(ReadBuffer & buf)
{
if (!buf.eof())

View File

@ -545,7 +545,8 @@ bool StorageChunkMerger::MergeTask::mergeChunks(const Storages & chunks)
{
LOG_INFO(log, "Shutdown requested while merging chunks.");
output->writeSuffix();
new_storage.removeReference(); /// После этого временные данные удалятся.
output = nullptr;
executeQuery("DROP TABLE IF EXISTS " + new_table_full_name, context, true);
return false;
}
@ -575,14 +576,16 @@ bool StorageChunkMerger::MergeTask::mergeChunks(const Storages & chunks)
/// Отцепляем исходную таблицу. Ее данные и метаданные остаются на диске.
tables_to_drop.push_back(context.detachTable(chunk_merger.source_database, src_name));
/// Создаем на ее месте ChunkRef. Это возможно только потому что у ChunkRef нет ни, ни метаданных.
/// Создаем на ее месте ChunkRef. Это возможно только потому что у ChunkRef нет ни данных, ни метаданных.
try
{
context.addTable(chunk_merger.source_database, src_name, StorageChunkRef::create(src_name, context, chunk_merger.source_database, new_table_name, false));
context.addTable(chunk_merger.source_database, src_name,
StorageChunkRef::create(src_name, context, chunk_merger.source_database, new_table_name, false));
}
catch (...)
{
LOG_ERROR(log, "Chunk " + src_name + " was removed but not replaced. Its data is stored in table " << new_table_name << ". You may need to resolve this manually.");
LOG_ERROR(log, "Chunk " + src_name + " was removed but not replaced. Its data is stored in table "
<< new_table_name << ". You may need to resolve this manually.");
throw;
}
@ -601,9 +604,6 @@ bool StorageChunkMerger::MergeTask::mergeChunks(const Storages & chunks)
/// что-нибудь может сломаться.
}
/// Сейчас на new_storage ссылаются таблицы типа ChunkRef. Удалим лишнюю ссылку, которая была при создании.
new_storage.removeReference();
LOG_TRACE(log, "Merged chunks.");
return true;
@ -613,6 +613,7 @@ bool StorageChunkMerger::MergeTask::mergeChunks(const Storages & chunks)
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
currently_written_groups.erase(new_table_full_name);
executeQuery("DROP TABLE IF EXISTS " + new_table_full_name, context, true);
throw;
}

View File

@ -161,9 +161,6 @@ StorageChunks::StorageChunks(
context(context_),
log(&Logger::get("StorageChunks"))
{
if (!attach)
reference_counter.add(1, true);
_table_column_name = "_table" + VirtualColumnUtils::chooseSuffix(getColumnsList(), "_table");
try

View File

@ -233,7 +233,15 @@ Block LogBlockInputStream::readImpl()
else
column.column = column.type->createColumn();
readData(*it, *column.type, *column.column, max_rows_to_read, 0, read_offsets);
try
{
readData(*it, *column.type, *column.column, max_rows_to_read, 0, read_offsets);
}
catch (Exception & e)
{
e.addMessage("while reading column " + *it + " at " + storage.path + escapeForFileName(storage.name));
throw;
}
if (column.column->size())
res.insert(column);

View File

@ -198,7 +198,15 @@ Block TinyLogBlockInputStream::readImpl()
else
column.column = column.type->createColumn();
readData(*it, *column.type, *column.column, block_size, 0, read_offsets);
try
{
readData(*it, *column.type, *column.column, block_size, 0, read_offsets);
}
catch (Exception & e)
{
e.addMessage("while reading column " + *it + " at " + storage.full_path());
throw;
}
if (column.column->size())
res.insert(column);

View File

@ -47,7 +47,6 @@ public:
</zookeeper>
*/
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name);
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, int32_t session_timeout_ms);
~ZooKeeper();
@ -126,6 +125,9 @@ public:
void set(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);
/** Создает ноду, если ее не существует. Иначе обновляет */
void createOrUpdate(const std::string & path, const std::string & data, int32_t mode);
/** Не бросает исключение при следующих ошибках:
* - Такой ноды нет.
* - У ноды другая версия.

View File

@ -128,13 +128,6 @@ ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std
init(args.hosts, args.session_timeout_ms);
}
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration& config, const std::string& config_name, int32_t session_timeout_ms_)
{
ZooKeeperArgs args(config, config_name);
init(args.hosts, session_timeout_ms_);
}
void * ZooKeeper::watchForEvent(EventPtr event)
{
if (event)
@ -390,7 +383,7 @@ bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_
{
int32_t code = retry(std::bind(&ZooKeeper::getImpl, this, std::ref(path), std::ref(res), stat_, watch));
if (!( code == ZOK ||
if (!(code == ZOK ||
code == ZNONODE))
throw KeeperException(code, path);
@ -418,6 +411,17 @@ void ZooKeeper::set(const std::string & path, const std::string & data, int32_t
check(trySet(path, data, version, stat), path);
}
void ZooKeeper::createOrUpdate(const std::string & path, const std::string & data, int32_t mode)
{
int code = trySet(path, data, -1);
if (code == ZNONODE)
{
create(path, data, mode);
}
else if (code != ZOK)
throw zkutil::KeeperException(code, path);
}
int32_t ZooKeeper::trySet(const std::string & path, const std::string & data,
int32_t version, Stat * stat_)
{