rewrite insert using sequential nodes

This commit is contained in:
Alexey Zatelepin 2017-12-27 22:53:21 +03:00
parent 78edbaf009
commit 9ba0afbf2e
4 changed files with 21 additions and 72 deletions

View File

@ -120,21 +120,14 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
return result;
}
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
BlockWithPartition & block_with_partition, std::optional<Int64> block_number)
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition)
{
Block & block = block_with_partition.block;
static const String TMP_PREFIX = "tmp_insert_";
Int64 temp_index;
if (block_number)
temp_index = block_number.value();
else
{
/// This will generate unique name in scope of current server process.
temp_index = data.insert_increment.get();
}
/// This will generate unique name in scope of current server process.
Int64 temp_index = data.insert_increment.get();
MergeTreeDataPart::MinMaxIndex minmax_idx;
minmax_idx.update(block, data.minmax_idx_columns);

View File

@ -13,7 +13,6 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <optional>
namespace DB
{
@ -47,7 +46,7 @@ public:
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
*/
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, std::optional<Int64> block_number = std::nullopt);
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block);
private:
MergeTreeData & data;

View File

@ -32,76 +32,35 @@ void NextGenReplicatedBlockOutputStream::write(const Block & block)
{
Stopwatch watch;
/// TODO: generate block number beforehand?
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block);
String seqnum_path = storage.zookeeper_path + "/seq_numbers/" + part->info.partition_id;
String parts_path = storage.zookeeper_path + "/parts/";
String parts_path = storage.zookeeper_path + "/parts";
/// Generate a unique seqnum and ensure that the ephemeral part node is created before any other
/// part node with the higher seqnum.
String ephemeral_node_path;
String ephemeral_node_prefix = parts_path + "/tmp_insert_";
String ephemeral_node_path = zookeeper->create(
ephemeral_node_prefix, String(), zkutil::CreateMode::EphemeralSequential);
SCOPE_EXIT(
{
if (!ephemeral_node_path.empty())
zookeeper->tryRemoveEphemeralNodeWithRetries(ephemeral_node_path);
});
while (true)
{
int32_t block_number;
UInt64 block_number = parse<UInt64>(
ephemeral_node_path.data() + ephemeral_node_prefix.size(),
ephemeral_node_path.size() - ephemeral_node_prefix.size());
Stat stat;
int32_t rc = zookeeper->trySet(seqnum_path, String(), -1, &stat);
if (rc == ZOK)
block_number = stat.version;
else if (rc == ZNONODE)
{
int32_t rc = zookeeper->tryCreate(seqnum_path, String(), zkutil::CreateMode::Persistent);
if (rc == ZOK)
{
stat.version = 0;
block_number = 0;
}
else if (rc == ZNODEEXISTS)
continue;
else
throw zkutil::KeeperException(rc, seqnum_path);
}
else
throw zkutil::KeeperException(rc, seqnum_path);
part->info.min_block = block_number;
part->info.max_block = block_number;
part->info.level = 0;
if (storage.data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
part->name = part->info.getPartNameV0(part->getMinDate(), part->getMaxDate());
else
part->name = part->info.getPartName();
part->info.min_block = block_number;
part->info.max_block = block_number;
part->info.level = 0;
if (storage.data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
part->name = part->info.getPartNameV0(part->getMinDate(), part->getMaxDate());
else
part->name = part->info.getPartName();
ephemeral_node_path = parts_path + part->name;
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
ephemeral_node_path, String(), acl, zkutil::CreateMode::Ephemeral));
ops.emplace_back(std::make_unique<zkutil::Op::Check>(
seqnum_path, stat.version));
rc = zookeeper->tryMulti(ops);
if (rc == ZOK)
break;
else if (rc == ZBADVERSION)
{
/// TODO: ProfileEvents?
/// TODO: intelligent backoff.
continue;
}
else
throw zkutil::KeeperException(rc);
}
String part_path = parts_path + "/" + part->name;
String hash_string;
{
@ -119,9 +78,9 @@ void NextGenReplicatedBlockOutputStream::write(const Block & block)
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(ephemeral_node_path, -1));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
ephemeral_node_path, storage.replica_name, acl, zkutil::CreateMode::Persistent));
part_path, storage.replica_name, acl, zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
ephemeral_node_path + "/checksum", hash_string, acl, zkutil::CreateMode::Persistent));
part_path + "/checksum", hash_string, acl, zkutil::CreateMode::Persistent));
MergeTreeData::Transaction transaction;
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);

View File

@ -129,8 +129,6 @@ void StorageNextGenReplicatedMergeTree::createTableOrReplica(zkutil::ZooKeeper &
zkutil::Ops ops;
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
zookeeper_path, String(), acl, zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
zookeeper_path + "/seq_numbers", String(), acl, zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
zookeeper_path + "/parts", String(), acl, zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(