fixup #2: comments on English, proper classname, StorageFactory update and merging in 'flush' method without lock [#CLICKHOUSE-2929]

This commit is contained in:
Alexander Makarov 2017-04-17 20:47:17 +03:00 committed by alexey-milovidov
parent f8e19de0e3
commit fdd636a785
3 changed files with 148 additions and 97 deletions

View File

@ -19,6 +19,7 @@
#include <Storages/StorageStripeLog.h>
#include <Storages/StorageMemory.h>
#include <Storages/StorageBuffer.h>
#include <Storages/StorageTrivialBuffer.h>
#include <Storages/StorageNull.h>
#include <Storages/StorageMerge.h>
#include <Storages/StorageMergeTree.h>
@ -556,6 +557,52 @@ StoragePtr StorageFactory::get(
num_buckets, {min_time, min_rows, min_bytes}, {max_time, max_rows, max_bytes},
destination_database, destination_table);
}
else if (name == "TrivialBuffer")
{
/** TrivialBuffer(db, table, num_blocks_to_deduplicate, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
*
* db, table - in which table to put data from buffer.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for pushing out from the buffer.
* num_blocks_to_deduplicate - level of parallelism.
*/
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1)
throw Exception("Storage Buffer requires 9 parameters: "
" destination database, destination table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 9)
throw Exception("Storage Buffer requires 9 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
args[0] = evaluateConstantExpressionOrIdentidierAsLiteral(args[0], local_context);
args[1] = evaluateConstantExpressionOrIdentidierAsLiteral(args[1], local_context);
String destination_database = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
String destination_table = static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>();
size_t num_blocks_to_deduplicate = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[2]).value);
time_t min_time = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[3]).value);
time_t max_time = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[4]).value);
size_t min_rows = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[5]).value);
size_t max_rows = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[6]).value);
size_t min_bytes = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[7]).value);
size_t max_bytes = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[8]).value);
return StorageTrivialBuffer::create(
table_name, columns,
materialized_columns, alias_columns, column_defaults,
context,
num_blocks_to_deduplicate, {min_time, min_rows, min_bytes}, {max_time, max_rows, max_bytes},
destination_database, destination_table);
}
else if (endsWith(name, "MergeTree"))
{
/** [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing|Graphite]MergeTree (2 * 7 combinations) engines

View File

@ -3,7 +3,7 @@
#include <Interpreters/InterpreterAlterQuery.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Databases/IDatabase.h>
#include <Storages/TrivialStorageBuffer.h>
#include <Storages/StorageTrivialBuffer.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
@ -43,7 +43,7 @@ namespace ErrorCodes
}
StoragePtr TrivialStorageBuffer::create(const std::string & name_, NamesAndTypesListPtr columns_,
StoragePtr StorageTrivialBuffer::create(const std::string & name_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
@ -58,7 +58,7 @@ StoragePtr TrivialStorageBuffer::create(const std::string & name_, NamesAndTypes
}
TrivialStorageBuffer::TrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_,
StorageTrivialBuffer::StorageTrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
@ -74,7 +74,7 @@ TrivialStorageBuffer::TrivialBuffer(const std::string & name_, NamesAndTypesList
destination_database(destination_database_), destination_table(destination_table_),
no_destination(destination_database.empty() && destination_table.empty()),
log(&Logger::get("TrivialBuffer (" + name + ")")),
flush_thread(&TrivialStorageBuffer::flushThread, this)
flush_thread(&StorageTrivialBuffer::flushThread, this)
{
}
@ -82,9 +82,9 @@ class TrivialBufferBlockInputStream : public IProfilingBlockInputStream
{
public:
TrivialBufferBlockInputStream(const Names & column_names_, BlocksList::iterator begin_,
BlocksList::iterator end_, TrivialStorageBuffer & buffer_)
: column_names(column_names_), begin(begin_), end(end_),
it(begin_), buffer(buffer_) {}
BlocksList::iterator end_, StorageTrivialBuffer & buffer_)
: column_names(column_names_), buffer(buffer_),
begin(begin_), end(end_), it(begin_) {}
String getName() const { return "TrivialStorageBuffer"; }
@ -117,11 +117,11 @@ protected:
private:
Names column_names;
TrivialStorageBuffer & buffer;
StorageTrivialBuffer & buffer;
BlocksList::iterator begin, end, it;
};
BlockInputStreams TrivialStorageBuffer::read(
BlockInputStreams StorageTrivialBuffer::read(
const Names & column_names,
ASTPtr query,
const Context & context,
@ -154,7 +154,7 @@ BlockInputStreams TrivialStorageBuffer::read(
}
BlockInputStreams streams_from_buffers;
std::lock_guard<std::mutex> lock(buffer.mutex);
std::lock_guard<std::mutex> lock(mutex);
size_t size = data.size();
if (threads > size)
threads = size;
@ -181,7 +181,7 @@ BlockInputStreams TrivialStorageBuffer::read(
return streams;
}
void TrivialStorageBuffer::addBlock(const Block & block)
void StorageTrivialBuffer::addBlock(const Block & block)
{
SipHash hash;
block.updateHash(hash);
@ -209,13 +209,13 @@ void TrivialStorageBuffer::addBlock(const Block & block)
auto it = previous_hashes->find(block_hash);
if (it != previous_hashes->end())
{
current_hashes->insert(it);
current_hashes->insert(*it);
previous_hashes->erase(it);
}
}
}
void TrivialStorageBuffer::flush(bool check_thresholds)
void StorageTrivialBuffer::flush(bool check_thresholds, bool is_called_from_background)
{
Block block_to_write;
time_t current_time = time(0);
@ -225,8 +225,17 @@ void TrivialStorageBuffer::flush(bool check_thresholds)
if (data.empty())
return;
BlocksList::iterator flush_begin, flush_end;
{
std::lock_guard<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex, std::try_to_lock_t());
if (!lock.owns_lock())
{
// NOTE: is this the behavior we expect from 'flush' concurrency?
if (!is_called_from_background)
LOG_ERROR(log, "Method \'StorageTrivialBuffer::flush\' was called simultaneously from different threads");
return;
}
if (first_write_time)
time_passed = current_time - first_write_time;
@ -242,49 +251,53 @@ void TrivialStorageBuffer::flush(bool check_thresholds)
return;
}
/// Collecting BlockList into single block.
block_to_write = data.front().cloneEmpty();
block_to_write.checkNumberOfRows();
for (auto & block : data)
flush_begin = data.begin();
flush_end = std::prev(data.end());
block_to_write = flush_begin->cloneEmpty();
}
/// Collecting BlockList into single block.
block_to_write.checkNumberOfRows();
flush_end = std::next(flush_end);
for (auto block = flush_begin; block != flush_end; ++block)
{
block->checkNumberOfRows();
for (size_t column_no = 0, columns = block->columns(); column_no < columns; ++column_no)
{
block.checkNumberOfRows();
for (size_t column_no = 0, columns = block.columns(); column_no < columns; ++column_no)
{
IColumn & col_to = *block_to_write.safeGetByPosition(column_no).column.get();
const IColumn & col_from = *block.getByName(col_to.getName()).column.get();
col_to.insertRangeFrom(col_from, 0, block.rows());
}
IColumn & col_to = *block_to_write.safeGetByPosition(column_no).column.get();
const IColumn & col_from = *block->getByName(col_to.getName()).column.get();
col_to.insertRangeFrom(col_from, 0, block->rows());
}
first_write_time = 0;
ProfileEvents::increment(ProfileEvents::StorageBufferFlush);
}
first_write_time = 0;
LOG_TRACE(log, "Flushing buffer with " << block_to_write.rows() << " rows, " << block_to_write.bytes() << " bytes, age " << time_passed << " seconds.");
ProfileEvents::increment(ProfileEvents::StorageBufferFlush);
if (no_destination)
return;
LOG_TRACE(log, "Flushing buffer with " << block_to_write.rows() << " rows, " << block_to_write.bytes() << " bytes, age " << time_passed << " seconds.");
try
{
writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table));
data.clear();
if (no_destination)
return;
CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_to_write.rows());
CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
try
{
writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table));
data.erase(flush_begin, flush_end);
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::StorageBufferErrorOnFlush);
CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_to_write.rows());
CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
if (!first_write_time)
first_write_time = current_time;
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::StorageBufferErrorOnFlush);
/// Через некоторое время будет следующая попытка записать.
throw;
}
if (!first_write_time)
first_write_time = current_time;
/// We'll retry to write in a moment.
throw;
}
}
@ -292,7 +305,7 @@ void TrivialStorageBuffer::flush(bool check_thresholds)
class TrivialBufferBlockOutputStream : public IBlockOutputStream
{
public:
TrivialBufferBlockOutputStream(TrivialStorageBuffer & buffer_) : buffer(buffer_) {}
TrivialBufferBlockOutputStream(StorageTrivialBuffer & buffer_) : buffer(buffer_) {}
void write(const Block & block) override
{
if (!block)
@ -315,7 +328,6 @@ public:
throw Exception("Destination table is myself. Write will "
"cause infinite loop.", ErrorCodes::INFINITE_LOOP);
/// Проверяем структуру таблицы.
try
{
destination->check(block, true);
@ -330,16 +342,13 @@ public:
}
}
/// Вставляем блок в список блоков.
time_t current_time = time(0);
if (buffer.checkThresholds(current_time, rows, bytes))
{
/** Если после вставки в буфер, ограничения будут превышены,
* то будем сбрасывать буфер.
* Это также защищает от неограниченного потребления оперативки,
* так как в случае невозможности записать в таблицу,
* будет выкинуто исключение, а новые данные не будут добавлены в буфер.
/** We'll try to flush the buffer if thresholds are overdrafted.
* It avoids unlimited memory consuming, bcause if we failed to write
* data down to the destination table, we'll throw an exception and
* the new block will not be appended to the buffer.
*/
buffer.flush(true);
@ -351,15 +360,15 @@ public:
buffer.addBlock(block);
}
private:
TrivialStorageBuffer & buffer;
StorageTrivialBuffer & buffer;
};
BlockOutputStreamPtr TrivialStorageBuffer::write(ASTPtr query, const Settings & settings)
BlockOutputStreamPtr StorageTrivialBuffer::write(ASTPtr query, const Settings & settings)
{
return std::make_shared<TrivialBufferBlockOutputStream>(*this);
}
void TrivialStorageBuffer::shutdown()
void StorageTrivialBuffer::shutdown()
{
shutdown_event.set();
@ -388,7 +397,7 @@ void TrivialStorageBuffer::shutdown()
*
* This kind of race condition make very hard to implement proper tests.
*/
bool TrivialStorageBuffer::optimize(const String & partition, bool final, const Settings & settings)
bool StorageTrivialBuffer::optimize(const String & partition, bool final, const Settings & settings)
{
if (!partition.empty())
throw Exception("Partition cannot be specified when optimizing table of type Buffer",
@ -404,7 +413,7 @@ bool TrivialStorageBuffer::optimize(const String & partition, bool final, const
bool TrivialStorageBuffer::checkThresholds(const time_t current_time, const size_t additional_rows,
bool StorageTrivialBuffer::checkThresholds(const time_t current_time, const size_t additional_rows,
const size_t additional_bytes) const
{
time_t time_passed = 0;
@ -418,7 +427,7 @@ bool TrivialStorageBuffer::checkThresholds(const time_t current_time, const size
}
bool TrivialStorageBuffer::checkThresholdsImpl(const size_t rows, const size_t bytes,
bool StorageTrivialBuffer::checkThresholdsImpl(const size_t rows, const size_t bytes,
const time_t time_passed) const
{
if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
@ -448,7 +457,7 @@ bool TrivialStorageBuffer::checkThresholdsImpl(const size_t rows, const size_t b
return false;
}
void TrivialStorageBuffer::flushThread()
void StorageTrivialBuffer::flushThread()
{
setThreadName("BufferFlush");
@ -465,7 +474,7 @@ void TrivialStorageBuffer::flushThread()
} while (!shutdown_event.tryWait(1000));
}
void TrivialStorageBuffer::writeBlockToDestination(const Block & block, StoragePtr table)
void StorageTrivialBuffer::writeBlockToDestination(const Block & block, StoragePtr table)
{
if (no_destination || !block)
return;
@ -481,8 +490,8 @@ void TrivialStorageBuffer::writeBlockToDestination(const Block & block, StorageP
insert->database = destination_database;
insert->table = destination_table;
/** Будем вставлять столбцы, являющиеся пересечением множества столбцов таблицы-буфера и подчинённой таблицы.
* Это позволит поддержать часть случаев (но не все), когда структура таблицы не совпадает.
/** Inserting the set columns which is the intersection of buffer columns and destination table ones.
* It will help us to support some cases with different tables' structures.
*/
Block structure_of_destination_table = table->getSampleBlock();
Names columns_intersection;
@ -527,7 +536,7 @@ void TrivialStorageBuffer::writeBlockToDestination(const Block & block, StorageP
block_io.out->writeSuffix();
}
void TrivialStorageBuffer::alter(const AlterCommands & params, const String & database_name,
void StorageTrivialBuffer::alter(const AlterCommands & params, const String & database_name,
const String & table_name, const Context & context)
{
for (const auto & param : params)
@ -537,7 +546,7 @@ void TrivialStorageBuffer::alter(const AlterCommands & params, const String & da
auto lock = lockStructureForAlter();
/// Чтобы не осталось блоков старой структуры.
/// To avoid presence of blocks of different structure in the buffer.
flush(false);
params.apply(*columns, materialized_columns, alias_columns, column_defaults);

View File

@ -15,35 +15,30 @@ namespace DB
class Context;
/** При вставке буферизует входящие блоки, пока не превышены некоторые пороги.
* Когда пороги превышены - отправляет блоки в другую таблицу в том же порядке,
* в котором они пришли в данную таблицу.
/** Stores incoming blocks until some thresholds are exceeded, then sends
* them to the table it looks into in the same order they came to the buffer.
*
* Пороги проверяются при вставке, а также, периодически, в фоновом потоке
* (чтобы реализовать пороги по времени).
* Если в таблицу вставляется блок, который сам по себе превышает max-пороги, то он
* записывается сразу в подчинённую таблицу без буферизации.
* Пороги могут быть превышены. Например, если max_rows = 1 000 000, в буфере уже было
* 500 000 строк, и добавляется кусок из 800 000 строк, то в буфере окажется 1 300 000 строк,
* и затем такой блок будет записан в подчинённую таблицу
* Thresolds are checked during insert and in background thread (to control
* time thresholds).
* If inserted block exceedes max limits, buffer is flushed and then the incoming
* block is appended to buffer.
*
* При уничтожении таблицы типа TrivialBuffer и при завершении работы, все данные сбрасываются.
* Данные в буфере не реплицируются, не логгируются на диск, не индексируются. При грубом
* перезапуске сервера, данные пропадают.
* Destroying TrivialBuffer or shutting down lead to the buffer flushing.
* The data in the buffer is not replicated, logged or stored. After hard reset of the
* server, the data is lost.
*/
class TrivialStorageBuffer : private ext::shared_ptr_helper<TrivialStorageBuffer>, public IStorage
class StorageTrivialBuffer : private ext::shared_ptr_helper<StorageTrivialBuffer>, public IStorage
{
friend class ext::shared_ptr_helper<TrivialStorageBuffer>;
friend class ext::shared_ptr_helper<StorageTrivialBuffer>;
friend class TrivialBufferBlockInputStream;
friend class TrivialBufferBlockOutputStream;
public:
/// Пороги.
struct Thresholds
{
time_t time; /// Количество секунд от момента вставки первой строчки в блок.
size_t rows; /// Количество строк в блоке.
size_t bytes; /// Количество (несжатых) байт в блоке.
time_t time; /// Seconds after insertion of first block.
size_t rows; /// Number of rows in buffer.
size_t bytes; /// Number of bytes (incompressed) in buffer.
};
static StoragePtr create(const std::string & name_, NamesAndTypesListPtr columns_,
@ -75,7 +70,7 @@ public:
bool checkThresholdsImpl(const size_t rows, const size_t bytes,
const time_t time_passed) const;
/// Сбрасывает все буферы в подчинённую таблицу.
/// Writes all the blocks in buffer into the destination table.
void shutdown() override;
bool optimize(const String & partition, bool final, const Settings & settings) override;
@ -88,7 +83,7 @@ public:
bool supportsIndexForIn() const override { return true; }
bool supportsParallelReplicas() const override { return true; }
/// Структура подчинённой таблицы не проверяется и не изменяется.
/// Does not check or alter the structure of dependent table.
void alter(const AlterCommands & params, const String & database_name,
const String & table_name, const Context & context) override;
@ -108,25 +103,25 @@ private:
const size_t num_blocks_to_deduplicate;
using HashType = UInt64;
using DeduplicationBuffer = std::unordered_set<HashType>;
/// Вставка хэшей новый блоков идет в current_hashes, lookup - в
/// обоих set'ах. Когда current_hashes переполняется, current сбрасывается
/// в previous, а в current создается новый set.
/// We insert new blocks' hashes into 'current_hashes' and perform lookup
/// into both sets. If 'current_hashes' is overflowed, it flushes into
/// into 'previous_hashes', and new set is created for 'current'.
std::unique_ptr<DeduplicationBuffer> current_hashes, previous_hashes;
const Thresholds min_thresholds;
const Thresholds max_thresholds;
const String destination_database;
const String destination_table;
/// Если задано - не записывать данные из буфера, а просто опустошать буфер.
/// If set, forces to clean out buffer, not write to destination table.
bool no_destination;
Poco::Logger * log;
Poco::Event shutdown_event;
/// Выполняет сброс данных по таймауту.
/// Executes flushing by the time thresholds.
std::thread flush_thread;
TrivialStorageBuffer(const std::string & name_, NamesAndTypesListPtr columns_,
StorageTrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
@ -135,12 +130,12 @@ private:
const String & destination_database_, const String & destination_table_);
void addBlock(const Block & block);
/// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен
/// соответствовать destination-у.
/// Parameter 'table' is passed because it's sometimes pre-computed. It should
/// conform the 'destination_table'.
void writeBlockToDestination(const Block & block, StoragePtr table);
void flush(bool check_thresholds = true);
void flush(bool check_thresholds = true, bool is_called_from_background = false);
void flushThread();
};