ClickHouse/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp

181 lines
5.9 KiB
C++
Raw Normal View History

#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/StorageDistributed.h>
2016-01-28 01:00:42 +00:00
#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>
2016-01-28 01:00:42 +00:00
#include <IO/WriteBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/createBlockSelector.h>
2016-01-28 01:00:42 +00:00
#include <DataTypes/DataTypesNumber.h>
#include <Common/ClickHouseRevision.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
2016-01-28 01:00:42 +00:00
2017-05-10 04:29:36 +00:00
#include <Poco/DirectoryIterator.h>
#include <memory>
2016-01-28 01:00:42 +00:00
#include <iostream>
namespace DB
2016-01-28 01:00:42 +00:00
{
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_)
: storage(storage), query_ast(query_ast), cluster(cluster_)
2016-01-28 01:00:42 +00:00
{
}
2016-01-28 01:00:42 +00:00
void DistributedBlockOutputStream::write(const Block & block)
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplit(block);
2016-01-28 01:00:42 +00:00
writeImpl(block);
2016-01-28 01:00:42 +00:00
}
IColumn::Selector DistributedBlockOutputStream::createSelector(Block block)
{
storage.getShardingKeyExpr()->execute(block);
const auto & key_column = block.getByName(storage.getShardingKeyColumnName());
size_t num_shards = cluster->getShardsInfo().size();
const auto & slot_to_shard = cluster->getSlotToShard();
#define CREATE_FOR_TYPE(TYPE) \
if (typeid_cast<const DataType ## TYPE *>(key_column.type.get())) \
return createBlockSelector<TYPE>(*key_column.column, num_shards, slot_to_shard);
CREATE_FOR_TYPE(UInt8)
CREATE_FOR_TYPE(UInt16)
CREATE_FOR_TYPE(UInt32)
CREATE_FOR_TYPE(UInt64)
CREATE_FOR_TYPE(Int8)
CREATE_FOR_TYPE(Int16)
CREATE_FOR_TYPE(Int32)
CREATE_FOR_TYPE(Int64)
2016-01-28 01:00:42 +00:00
#undef CREATE_FOR_TYPE
throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
2016-01-28 01:00:42 +00:00
}
2016-01-28 01:00:42 +00:00
void DistributedBlockOutputStream::writeSplit(const Block & block)
{
const auto num_cols = block.columns();
/// cache column pointers for later reuse
std::vector<const IColumn *> columns(num_cols);
for (size_t i = 0; i < columns.size(); ++i)
columns[i] = block.safeGetByPosition(i).column.get();
2016-01-28 01:00:42 +00:00
auto selector = createSelector(block);
2016-01-28 01:00:42 +00:00
/// Split block to num_shard smaller block, using 'selector'.
2016-01-28 01:00:42 +00:00
const size_t num_shards = cluster->getShardsInfo().size();
Blocks splitted_blocks(num_shards);
2016-01-28 01:00:42 +00:00
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
splitted_blocks[shard_idx] = block.cloneEmpty();
2016-01-28 01:00:42 +00:00
size_t columns_in_block = block.columns();
for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block)
{
Columns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_shards, selector);
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
splitted_blocks[shard_idx].getByPosition(col_idx_in_block).column = std::move(splitted_columns[shard_idx]);
}
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
if (splitted_blocks[shard_idx].rows())
writeImpl(splitted_blocks[shard_idx], shard_idx);
2016-01-28 01:00:42 +00:00
}
2016-01-28 01:00:42 +00:00
void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
if (shard_info.getLocalNodeCount() > 0)
writeToLocal(block, shard_info.getLocalNodeCount());
2016-01-28 01:00:42 +00:00
/// dir_names is empty if shard has only local addresses
if (!shard_info.dir_names.empty())
writeToShard(block, shard_info.dir_names);
2016-01-28 01:00:42 +00:00
}
2016-01-28 01:00:42 +00:00
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
{
InterpreterInsertQuery interp{query_ast, storage.context};
2016-01-28 01:00:42 +00:00
auto block_io = interp.execute();
block_io.out->writePrefix();
2016-01-28 01:00:42 +00:00
for (size_t i = 0; i < repeats; ++i)
block_io.out->write(block);
2016-01-28 01:00:42 +00:00
block_io.out->writeSuffix();
2016-01-28 01:00:42 +00:00
}
2016-01-28 01:00:42 +00:00
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
/** tmp directory is used to ensure atomicity of transactions
2017-05-10 04:29:36 +00:00
* and keep monitor thread out from reading incomplete data
*/
std::string first_file_tmp_path{};
2016-01-28 01:00:42 +00:00
auto first = true;
const auto & query_string = queryToString(query_ast);
2016-01-28 01:00:42 +00:00
/// write first file, hardlink the others
for (const auto & dir_name : dir_names)
{
const auto & path = storage.getPath() + dir_name + '/';
2016-01-28 01:00:42 +00:00
/// ensure shard subdirectory creation and notify storage
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(dir_name);
2016-01-28 01:00:42 +00:00
const auto & file_name = toString(storage.file_names_increment.get()) + ".bin";
const auto & block_file_path = path + file_name;
2016-01-28 01:00:42 +00:00
/** on first iteration write block to a temporary directory for subsequent hardlinking to ensure
* the inode is not freed until we're done */
if (first)
{
first = false;
2016-01-28 01:00:42 +00:00
const auto & tmp_path = path + "tmp/";
Poco::File(tmp_path).createDirectory();
const auto & block_file_tmp_path = tmp_path + file_name;
2016-01-28 01:00:42 +00:00
first_file_tmp_path = block_file_tmp_path;
2016-01-28 01:00:42 +00:00
WriteBufferFromFile out{block_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get()};
2016-01-28 01:00:42 +00:00
writeStringBinary(query_string, out);
2016-01-28 01:00:42 +00:00
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
}
2016-01-28 01:00:42 +00:00
if (link(first_file_tmp_path.data(), block_file_path.data()))
throwFromErrno("Could not link " + block_file_path + " to " + first_file_tmp_path);
}
2016-01-28 01:00:42 +00:00
/** remove the temporary file, enabling the OS to reclaim inode after all threads
* have removed their corresponding files */
Poco::File(first_file_tmp_path).remove();
2016-01-28 01:00:42 +00:00
}
}