unify rewriteQuery, rewrite insert query. [#METR-12221]

This commit is contained in:
Andrey Mironov 2014-08-13 16:52:30 +04:00
parent 7eff8548c3
commit 8b6adcb047
3 changed files with 77 additions and 41 deletions

View File

@ -1,6 +1,13 @@
#pragma once
#include <DB/Storages/StorageDistributed.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <statdaemons/Increment.h>
#include <iostream>
namespace DB
@ -9,8 +16,8 @@ namespace DB
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
DistributedBlockOutputStream(StorageDistributed & storage, Cluster & cluster, const ASTPtr & query)
: storage(storage), cluster(cluster), query(query)
DistributedBlockOutputStream(StorageDistributed & storage, Cluster & cluster, const std::string & query_string)
: storage(storage), cluster(cluster), query_string(query_string)
{
}
@ -62,17 +69,28 @@ private:
void writeImpl(const Block & block, const size_t shard_id = 0)
{
const auto & dir_name = cluster.shard_info_vec[shard_id].dir_name;
const auto & path = storage.getPath() + dir_name + '/';
/// ensure shard subdirectory creation and notify storage if necessary
if (Poco::File(storage.getPath() + dir_name).createDirectory())
if (Poco::File(path).createDirectory())
storage.createDirectoryMonitor(dir_name);
std::cout << "dummy write block of " << block.bytes() << " bytes to shard " << shard_id << std::endl;
const auto number = Increment(path + "increment").get(true);
const auto block_file_path = path + std::to_string(number);
DB::WriteBufferFromFile out{block_file_path};
DB::CompressedWriteBuffer compress{out};
DB::NativeBlockOutputStream stream{compress};
DB::writeStringBinary(query_string, out);
}
StorageDistributed & storage;
Cluster & cluster;
ASTPtr query;
std::string query_string;
};
}

View File

@ -87,9 +87,6 @@ private:
const ASTPtr & sharding_key_ = nullptr,
const String & data_path_ = String{});
/// Создает копию запроса, меняет имена базы данных и таблицы.
ASTPtr rewriteQuery(ASTPtr query);
void createDirectoryMonitors();
void directoryMonitorFunc(const std::string & path);

View File

@ -20,6 +20,46 @@
namespace DB
{
namespace {
template <typename ASTType>
inline std::string queryToString(const ASTPtr & query)
{
const auto & select = typeid_cast<const ASTType &>(*query);
std::ostringstream s;
formatAST(select, s, 0, false, true);
return s.str();
}
inline ASTPtr rewrite(const std::string & name, const ASTSelectQuery &, const ASTIdentifier::Kind kind)
{
return new ASTIdentifier{{}, name, kind};
}
inline const std::string & rewrite(const std::string & name, const ASTInsertQuery &, ASTIdentifier::Kind)
{
return name;
}
/// Создает копию запроса, меняет имена базы данных и таблицы.
template <typename ASTType>
inline ASTPtr rewriteQuery(const ASTPtr & query, const std::string & database, const std::string & table)
{
/// Создаем копию запроса.
auto modified_query_ast = query->clone();
/// Меняем имена таблицы и базы данных
auto & modified_query = typeid_cast<ASTType &>(*modified_query_ast);
modified_query.database = rewrite(database, modified_query, ASTIdentifier::Database);
modified_query.table = rewrite(table, modified_query, ASTIdentifier::Table);
/// copy elision and RVO will work as intended, but let's be more explicit
return std::move(modified_query_ast);
}
}
StorageDistributed::StorageDistributed(
const std::string & name_,
NamesAndTypesListPtr columns_,
@ -87,27 +127,6 @@ StoragePtr StorageDistributed::create(
return res->thisPtr();
}
ASTPtr StorageDistributed::rewriteQuery(ASTPtr query)
{
/// Создаем копию запроса.
ASTPtr modified_query_ast = query->clone();
/// Меняем имена таблицы и базы данных
ASTSelectQuery & select = typeid_cast<ASTSelectQuery &>(*modified_query_ast);
select.database = new ASTIdentifier(StringRange(), remote_database, ASTIdentifier::Database);
select.table = new ASTIdentifier(StringRange(), remote_table, ASTIdentifier::Table);
return modified_query_ast;
}
static String selectToString(ASTPtr query)
{
ASTSelectQuery & select = typeid_cast<ASTSelectQuery &>(*query);
std::stringstream s;
formatAST(select, s, 0, false, true);
return s.str();
}
BlockInputStreams StorageDistributed::read(
const Names & column_names,
ASTPtr query,
@ -126,20 +145,17 @@ BlockInputStreams StorageDistributed::read(
: QueryProcessingStage::WithMergeableState;
BlockInputStreams res;
ASTPtr modified_query_ast = rewriteQuery(query);
const auto & modified_query_ast = rewriteQuery<ASTSelectQuery>(
query, remote_database, remote_table
);
const auto & modified_query = queryToString<ASTSelectQuery>(modified_query_ast);
/// Цикл по шардам.
for (auto & conn_pool : cluster.pools)
{
String modified_query = selectToString(modified_query_ast);
res.push_back(new RemoteBlockInputStream(
conn_pool,
modified_query,
&new_settings,
external_tables,
processed_stage));
}
res.emplace_back(new RemoteBlockInputStream{
conn_pool, modified_query, &new_settings,
external_tables, processed_stage
});
/// Добавляем запросы к локальному ClickHouse.
if (cluster.getLocalNodesNum() > 0)
@ -169,7 +185,12 @@ BlockOutputStreamPtr StorageDistributed::write(ASTPtr query)
ErrorCodes::NOT_IMPLEMENTED
};
return new DistributedBlockOutputStream{*this, this->cluster, query};
return new DistributedBlockOutputStream{
*this, this->cluster,
queryToString<ASTInsertQuery>(rewriteQuery<ASTInsertQuery>(
query, remote_database, remote_table
))
};
}
void StorageDistributed::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
@ -219,7 +240,7 @@ void StorageDistributed::createDirectoryMonitor(const std::string & name)
directory_monitor_threads.emplace(
name,
std::thread{
&StorageDistributed::directoryMonitorFunc, this, path + name
&StorageDistributed::directoryMonitorFunc, this, path + name + '/'
}
);
}