StorageTrivialBuffer: added support for the replication [#CLICKHOUSE-2929]

This commit is contained in:
Alexander Makarov 2017-04-26 21:57:12 +03:00 committed by alexey-milovidov
parent fdd636a785
commit 55bfaca662
3 changed files with 138 additions and 48 deletions

View File

@ -559,25 +559,26 @@ StoragePtr StorageFactory::get(
} }
else if (name == "TrivialBuffer") else if (name == "TrivialBuffer")
{ {
/** TrivialBuffer(db, table, num_blocks_to_deduplicate, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) /** TrivialBuffer(db, table, num_blocks_to_deduplicate, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes, path_in_zookeeper)
* *
* db, table - in which table to put data from buffer. * db, table - in which table to put data from buffer.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for pushing out from the buffer. * min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for pushing out from the buffer.
* num_blocks_to_deduplicate - level of parallelism. * num_blocks_to_deduplicate - level of parallelism.
*/ */
const std::string error_message_argument_number_mismatch = "Storage TrivialBuffer requires 10 parameters: "
" destination database, destination table, num_blocks_to_deduplicate, min_time, max_time, min_rows,"
" max_rows, min_bytes, max_bytes, path_in_zookeeper.";
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children; ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1) if (args_func.size() != 1)
throw Exception("Storage Buffer requires 9 parameters: " throw Exception(error_message_argument_number_mismatch,
" destination database, destination table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children; ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 9) if (args.size() != 10)
throw Exception("Storage Buffer requires 9 parameters: " throw Exception(error_message_argument_number_mismatch,
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
args[0] = evaluateConstantExpressionOrIdentidierAsLiteral(args[0], local_context); args[0] = evaluateConstantExpressionOrIdentidierAsLiteral(args[0], local_context);
@ -595,12 +596,13 @@ StoragePtr StorageFactory::get(
size_t min_bytes = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[7]).value); size_t min_bytes = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[7]).value);
size_t max_bytes = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[8]).value); size_t max_bytes = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[8]).value);
String path_in_zk_for_deduplication = static_cast<const ASTLiteral &>(*args[9]).value.safeGet<String>();
return StorageTrivialBuffer::create( return StorageTrivialBuffer::create(
table_name, columns, table_name, columns,
materialized_columns, alias_columns, column_defaults, materialized_columns, alias_columns, column_defaults,
context, context, num_blocks_to_deduplicate, path_in_zk_for_deduplication,
num_blocks_to_deduplicate, {min_time, min_rows, min_bytes}, {max_time, max_rows, max_bytes}, {min_time, min_rows, min_bytes}, {max_time, max_rows, max_bytes},
destination_database, destination_table); destination_database, destination_table);
} }
else if (endsWith(name, "MergeTree")) else if (endsWith(name, "MergeTree"))

View File

@ -1,16 +1,16 @@
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Databases/IDatabase.h>
#include <Storages/StorageTrivialBuffer.h> #include <Storages/StorageTrivialBuffer.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTIdentifier.h> #include <Databases/IDatabase.h>
#include <Parsers/ASTExpressionList.h> #include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/SipHash.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Poco/Ext/ThreadNumber.h> #include <Poco/Ext/ThreadNumber.h>
#include <ext/range.hpp> #include <ext/range.hpp>
@ -48,12 +48,14 @@ StoragePtr StorageTrivialBuffer::create(const std::string & name_, NamesAndTypes
const NamesAndTypesList & alias_columns_, const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_, const ColumnDefaults & column_defaults_,
Context & context_, const size_t num_blocks_to_deduplicate_, Context & context_, const size_t num_blocks_to_deduplicate_,
const String & path_in_zk_for_deduplication_,
const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_) const String & destination_database_, const String & destination_table_)
{ {
return make_shared( return make_shared(
name_, columns_, materialized_columns_, alias_columns_, column_defaults_, name_, columns_, materialized_columns_, alias_columns_, column_defaults_,
context_, num_blocks_to_deduplicate_, min_thresholds_, max_thresholds_, context_, num_blocks_to_deduplicate_, path_in_zk_for_deduplication_,
min_thresholds_, max_thresholds_,
destination_database_, destination_table_); destination_database_, destination_table_);
} }
@ -63,19 +65,23 @@ StorageTrivialBuffer::StorageTrivialBuffer(const std::string & name_, NamesAndTy
const NamesAndTypesList & alias_columns_, const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_, const ColumnDefaults & column_defaults_,
Context & context_, const size_t num_blocks_to_deduplicate_, Context & context_, const size_t num_blocks_to_deduplicate_,
const String & path_in_zk_for_deduplication_,
const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_) const String & destination_database_, const String & destination_table_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, : IStorage{materialized_columns_, alias_columns_, column_defaults_},
name(name_), columns(columns_), context(context_), name(name_), columns(columns_), context(context_),
num_blocks_to_deduplicate(num_blocks_to_deduplicate_), num_blocks_to_deduplicate(num_blocks_to_deduplicate_),
current_hashes(std::make_unique<DeduplicationBuffer>()), path_in_zk_for_deduplication(path_in_zk_for_deduplication_),
previous_hashes(std::make_unique<DeduplicationBuffer>()), zookeeper(context.getZooKeeper()),
deduplication_controller(num_blocks_to_deduplicate, zookeeper, path_in_zk_for_deduplication),
min_thresholds(min_thresholds_), max_thresholds(max_thresholds_), min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
destination_database(destination_database_), destination_table(destination_table_), destination_database(destination_database_), destination_table(destination_table_),
no_destination(destination_database.empty() && destination_table.empty()), no_destination(destination_database.empty() && destination_table.empty()),
log(&Logger::get("TrivialBuffer (" + name + ")")), log(&Logger::get("TrivialBuffer (" + name + ")")),
flush_thread(&StorageTrivialBuffer::flushThread, this) flush_thread(&StorageTrivialBuffer::flushThread, this)
{ {
zookeeper->createAncestors(path_in_zk_for_deduplication);
zookeeper->createOrUpdate(path_in_zk_for_deduplication, {}, zkutil::CreateMode::Persistent);
} }
class TrivialBufferBlockInputStream : public IProfilingBlockInputStream class TrivialBufferBlockInputStream : public IProfilingBlockInputStream
@ -181,22 +187,17 @@ BlockInputStreams StorageTrivialBuffer::read(
return streams; return streams;
} }
void StorageTrivialBuffer::addBlock(const Block & block) template <typename DeduplicatioController>
void StorageTrivialBuffer::addBlock(const Block & block, DeduplicatioController & deduplication_controller)
{ {
SipHash hash; SipHash hash;
block.updateHash(hash); block.updateHash(hash);
HashType block_hash = hash.get64(); typename DeduplicatioController::HashType block_hash = DeduplicatioController::getHashFrom(hash);
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
if (current_hashes->find(block_hash) == current_hashes->end() if (!deduplication_controller.contains(block_hash))
&& previous_hashes->find(block_hash) == previous_hashes->end())
{ {
if (current_hashes->size() >= num_blocks_to_deduplicate / 2) deduplication_controller.insert(block_hash);
{
previous_hashes = std::move(current_hashes);
current_hashes = std::make_unique<DeduplicationBuffer>();
}
current_hashes->insert(block_hash);
current_rows += block.rows(); current_rows += block.rows();
current_bytes += block.bytes(); current_bytes += block.bytes();
data.push_back(block); data.push_back(block);
@ -206,12 +207,7 @@ void StorageTrivialBuffer::addBlock(const Block & block)
} }
else else
{ {
auto it = previous_hashes->find(block_hash); deduplication_controller.updateOnDeduplication(block_hash);
if (it != previous_hashes->end())
{
current_hashes->insert(*it);
previous_hashes->erase(it);
}
} }
} }
@ -357,7 +353,7 @@ public:
if (!buffer.first_write_time) if (!buffer.first_write_time)
buffer.first_write_time = current_time; buffer.first_write_time = current_time;
buffer.addBlock(block); buffer.addBlock/*<StorageTrivialBuffer::ZookeeperDeduplicationController>*/(block, buffer.deduplication_controller);
} }
private: private:
StorageTrivialBuffer & buffer; StorageTrivialBuffer & buffer;

View File

@ -2,11 +2,15 @@
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <ext/shared_ptr_helper.hpp>
#include <Common/SipHash.h>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <ext/shared_ptr_helper.hpp>
#include <Poco/Event.h> #include <Poco/Event.h>
#include <Storages/IStorage.h>
#include <zkutil/ZooKeeper.h>
namespace Poco { class Logger; } namespace Poco { class Logger; }
@ -46,6 +50,7 @@ public:
const NamesAndTypesList & alias_columns_, const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_, const ColumnDefaults & column_defaults_,
Context & context_, size_t num_blocks_to_deduplicate_, Context & context_, size_t num_blocks_to_deduplicate_,
const String & path_in_zk_for_deduplication_,
const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_); const String & destination_database_, const String & destination_table_);
@ -87,6 +92,93 @@ public:
void alter(const AlterCommands & params, const String & database_name, void alter(const AlterCommands & params, const String & database_name,
const String & table_name, const Context & context) override; const String & table_name, const Context & context) override;
class ZookeeperDeduplicationController
{
public:
using HashType = String;
static HashType getHashFrom(SipHash & hash) { return std::to_string(hash.get64()); }
bool contains(HashType block_hash)
{
std::string res;
return zookeeper->tryGet(path_in_zk_for_deduplication + "/" + block_hash, res);
}
void insert(HashType block_hash)
{
std::vector<String> current_hashes;
if (zookeeper->tryGetChildren(path_in_zk_for_deduplication, current_hashes) == ZNONODE)
{
throw DB::Exception("No node \'" + path_in_zk_for_deduplication + "\' to control deduplication.");
}
// Cleanup zookeeper if needed.
if (current_hashes.size() >= 2*num_blocks_to_deduplicate)
{
using HashWithTimestamp = std::pair<String, time_t>;
std::vector<HashWithTimestamp> hashes_with_timestamps;
for (auto & hash : current_hashes)
{
zkutil::Stat stat;
String res;
String path_in_zk = path_in_zk_for_deduplication + "/" + hash;
if (!zookeeper->tryGet(path_in_zk, res, &stat))
{
throw DB::Exception("Seems like a race conditions between replics was found, path: " + path_in_zk);
}
hashes_with_timestamps.emplace_back(path_in_zk, stat.ctime);
}
// We do not need to sort all the hashes, only 'num_blocks_to_deduplicate' hashes
// with minimum creation time.
auto hashes_with_timestamps_end = hashes_with_timestamps.end();
if (hashes_with_timestamps.size() > num_blocks_to_deduplicate)
hashes_with_timestamps_end = hashes_with_timestamps.begin() + num_blocks_to_deduplicate;
std::partial_sort(hashes_with_timestamps.begin(), hashes_with_timestamps_end, hashes_with_timestamps.end(),
[] (const HashWithTimestamp & a, const HashWithTimestamp & b) -> bool
{
return a.second > b.second;
}
);
zkutil::Ops nodes_to_remove;
for (auto it = hashes_with_timestamps.begin(); it != hashes_with_timestamps_end; ++it)
{
nodes_to_remove.emplace_back(std::make_unique<zkutil::Op::Remove>(it->first, -1));
}
zookeeper->tryMulti(nodes_to_remove);
}
// Finally, inserting new node.
std::string path_for_insert = path_in_zk_for_deduplication + "/" + block_hash;
if (zookeeper->tryCreate(path_for_insert, {},
zkutil::CreateMode::Persistent) != ZOK)
{
throw DB::Exception("Cannot create node at path: " + path_for_insert);
}
}
void updateOnDeduplication(HashType block_hash)
{
zookeeper->createOrUpdate(path_in_zk_for_deduplication + "/" + block_hash,
{}, zkutil::CreateMode::Persistent);
}
ZookeeperDeduplicationController(size_t num_blocks_to_deduplicate_, zkutil::ZooKeeperPtr zookeeper_,
const std::string & path_in_zk_for_deduplication_)
: num_blocks_to_deduplicate(num_blocks_to_deduplicate_),
zookeeper(zookeeper_), path_in_zk_for_deduplication(path_in_zk_for_deduplication_)
{ }
private:
using DeduplicationBuffer = std::unordered_set<HashType>;
size_t num_blocks_to_deduplicate;
zkutil::ZooKeeperPtr zookeeper;
const std::string path_in_zk_for_deduplication;
};
private: private:
String name; String name;
NamesAndTypesListPtr columns; NamesAndTypesListPtr columns;
@ -101,12 +193,10 @@ private:
size_t current_bytes = 0; size_t current_bytes = 0;
time_t first_write_time = 0; time_t first_write_time = 0;
const size_t num_blocks_to_deduplicate; const size_t num_blocks_to_deduplicate;
using HashType = UInt64; const String path_in_zk_for_deduplication;
using DeduplicationBuffer = std::unordered_set<HashType>; zkutil::ZooKeeperPtr zookeeper;
/// We insert new blocks' hashes into 'current_hashes' and perform lookup ZookeeperDeduplicationController deduplication_controller;
/// into both sets. If 'current_hashes' is overflowed, it flushes into
/// into 'previous_hashes', and new set is created for 'current'.
std::unique_ptr<DeduplicationBuffer> current_hashes, previous_hashes;
const Thresholds min_thresholds; const Thresholds min_thresholds;
const Thresholds max_thresholds; const Thresholds max_thresholds;
@ -126,10 +216,12 @@ private:
const NamesAndTypesList & alias_columns_, const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_, const ColumnDefaults & column_defaults_,
Context & context_, size_t num_blocks_to_deduplicate_, Context & context_, size_t num_blocks_to_deduplicate_,
const String & path_in_zk_for_deduplication_,
const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_); const String & destination_database_, const String & destination_table_);
void addBlock(const Block & block); template <typename DeduplicatioController>
void addBlock(const Block & block, DeduplicatioController & deduplication_controller);
/// Parameter 'table' is passed because it's sometimes pre-computed. It should /// Parameter 'table' is passed because it's sometimes pre-computed. It should
/// conform the 'destination_table'. /// conform the 'destination_table'.
void writeBlockToDestination(const Block & block, StoragePtr table); void writeBlockToDestination(const Block & block, StoragePtr table);