This commit is contained in:
Alexey Arno 2015-07-03 16:04:03 +03:00
commit 1d9066bb8a
7 changed files with 28 additions and 23 deletions

View File

@ -11,9 +11,8 @@
* (~ 700 МБ/сек., 15 млн. строк в секунду)
*/
#include <stdint.h>
#include <stddef.h>
#include <cstdint>
#include <cstddef>
#define ROTL(x,b) (u64)( ((x) << (b)) | ( (x) >> (64 - (b))) )

View File

@ -545,7 +545,8 @@ bool StorageChunkMerger::MergeTask::mergeChunks(const Storages & chunks)
{
LOG_INFO(log, "Shutdown requested while merging chunks.");
output->writeSuffix();
new_storage.removeReference(); /// После этого временные данные удалятся.
output = nullptr;
executeQuery("DROP TABLE IF EXISTS " + new_table_full_name, context, true);
return false;
}
@ -575,14 +576,16 @@ bool StorageChunkMerger::MergeTask::mergeChunks(const Storages & chunks)
/// Отцепляем исходную таблицу. Ее данные и метаданные остаются на диске.
tables_to_drop.push_back(context.detachTable(chunk_merger.source_database, src_name));
/// Создаем на ее месте ChunkRef. Это возможно только потому что у ChunkRef нет ни, ни метаданных.
/// Создаем на ее месте ChunkRef. Это возможно только потому что у ChunkRef нет ни данных, ни метаданных.
try
{
context.addTable(chunk_merger.source_database, src_name, StorageChunkRef::create(src_name, context, chunk_merger.source_database, new_table_name, false));
context.addTable(chunk_merger.source_database, src_name,
StorageChunkRef::create(src_name, context, chunk_merger.source_database, new_table_name, false));
}
catch (...)
{
LOG_ERROR(log, "Chunk " + src_name + " was removed but not replaced. Its data is stored in table " << new_table_name << ". You may need to resolve this manually.");
LOG_ERROR(log, "Chunk " + src_name + " was removed but not replaced. Its data is stored in table "
<< new_table_name << ". You may need to resolve this manually.");
throw;
}
@ -601,9 +604,6 @@ bool StorageChunkMerger::MergeTask::mergeChunks(const Storages & chunks)
/// что-нибудь может сломаться.
}
/// Сейчас на new_storage ссылаются таблицы типа ChunkRef. Удалим лишнюю ссылку, которая была при создании.
new_storage.removeReference();
LOG_TRACE(log, "Merged chunks.");
return true;
@ -613,6 +613,7 @@ bool StorageChunkMerger::MergeTask::mergeChunks(const Storages & chunks)
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
currently_written_groups.erase(new_table_full_name);
executeQuery("DROP TABLE IF EXISTS " + new_table_full_name, context, true);
throw;
}

View File

@ -161,9 +161,6 @@ StorageChunks::StorageChunks(
context(context_),
log(&Logger::get("StorageChunks"))
{
if (!attach)
reference_counter.add(1, true);
_table_column_name = "_table" + VirtualColumnUtils::chooseSuffix(getColumnsList(), "_table");
try

View File

@ -233,7 +233,15 @@ Block LogBlockInputStream::readImpl()
else
column.column = column.type->createColumn();
readData(*it, *column.type, *column.column, max_rows_to_read, 0, read_offsets);
try
{
readData(*it, *column.type, *column.column, max_rows_to_read, 0, read_offsets);
}
catch (Exception & e)
{
e.addMessage("while reading column " + *it + " at " + storage.path + escapeForFileName(storage.name));
throw;
}
if (column.column->size())
res.insert(column);

View File

@ -198,7 +198,15 @@ Block TinyLogBlockInputStream::readImpl()
else
column.column = column.type->createColumn();
readData(*it, *column.type, *column.column, block_size, 0, read_offsets);
try
{
readData(*it, *column.type, *column.column, block_size, 0, read_offsets);
}
catch (Exception & e)
{
e.addMessage("while reading column " + *it + " at " + storage.full_path());
throw;
}
if (column.column->size())
res.insert(column);

View File

@ -47,7 +47,6 @@ public:
</zookeeper>
*/
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name);
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, int32_t session_timeout_ms);
~ZooKeeper();

View File

@ -128,13 +128,6 @@ ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std
init(args.hosts, args.session_timeout_ms);
}
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration& config, const std::string& config_name, int32_t session_timeout_ms_)
{
ZooKeeperArgs args(config, config_name);
init(args.hosts, session_timeout_ms_);
}
void * ZooKeeper::watchForEvent(EventPtr event)
{
if (event)