Merge branch 'master' into session-cleaner-better-build-time

This commit is contained in:
Alexey Milovidov 2020-02-20 21:43:40 +03:00
commit 5894a750d5
87 changed files with 3899 additions and 2850 deletions

View File

@ -0,0 +1,26 @@
#pragma once
#include <Interpreters/Cluster.h>
namespace DB
{
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
using DatabaseAndTableName = std::pair<String, String>;
/// Hierarchical description of the tasks
struct ShardPartition;
struct TaskShard;
struct TaskTable;
struct TaskCluster;
struct ClusterPartition;
using TasksPartition = std::map<String, ShardPartition, std::greater<>>;
using ShardInfo = Cluster::ShardInfo;
using TaskShardPtr = std::shared_ptr<TaskShard>;
using TasksShard = std::vector<TaskShardPtr>;
using TasksTable = std::list<TaskTable>;
using ClusterPartitions = std::map<String, ClusterPartition, std::greater<>>;
}

View File

@ -1,5 +1,17 @@
set(CLICKHOUSE_COPIER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopier.cpp)
set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_common_zookeeper clickhouse_parsers clickhouse_functions clickhouse_table_functions clickhouse_aggregate_functions clickhouse_dictionaries string_utils ${Poco_XML_LIBRARY} PUBLIC daemon)
set(CLICKHOUSE_COPIER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopierApp.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopier.cpp
${CMAKE_CURRENT_SOURCE_DIR}/Internals.cpp)
set(CLICKHOUSE_COPIER_LINK PRIVATE
clickhouse_common_zookeeper
clickhouse_parsers
clickhouse_functions
clickhouse_table_functions
clickhouse_aggregate_functions
clickhouse_dictionaries
string_utils ${Poco_XML_LIBRARY} PUBLIC daemon)
set(CLICKHOUSE_COPIER_INCLUDE SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
clickhouse_program_add(copier)

File diff suppressed because it is too large Load Diff

View File

@ -1,89 +1,175 @@
#pragma once
#include <Poco/Util/ServerApplication.h>
#include <daemon/BaseDaemon.h>
/* clickhouse cluster copier util
* Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
*
* See overview in the docs: docs/en/utils/clickhouse-copier.md
*
* Implementation details:
*
* cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
* Distributed table (to preform data resharding). So, worker job is a partition of a source shard.
* A job has three states: Active, Finished and Abandoned. Abandoned means that worker died and did not finish the job.
*
* If an error occurred during the copying (a worker failed or a worker did not finish the INSERT), then the whole partition (on
* all destination servers) should be dropped and refilled. So, copying entity is a partition of all destination shards.
* If a failure is detected a special /is_dirty node is created in ZooKeeper signalling that other workers copying the same partition
* should stop, after a refilling procedure should start.
*
* ZooKeeper task node has the following structure:
* /task/path_root - path passed in --task-path parameter
* /description - contains user-defined XML config of the task
* /task_active_workers - contains ephemeral nodes of all currently active workers, used to implement max_workers limitation
* /server_fqdn#PID_timestamp - cluster-copier worker ID
* ...
* /tables - directory with table tasks
* /cluster.db.table1 - directory of table_hits task
* /partition1 - directory for partition1
* /shards - directory for source cluster shards
* /1 - worker job for the first shard of partition1 of table test.hits
* Contains info about current status (Active or Finished) and worker ID.
* /2
* ...
* /partition_active_workers
* /1 - for each job in /shards a corresponding ephemeral node created in /partition_active_workers
* It is used to detect Abandoned jobs (if there is Active node in /shards and there is no node in
* /partition_active_workers).
* Also, it is used to track active workers in the partition (when we need to refill the partition we do
* not DROP PARTITION while there are active workers)
* /2
* ...
* /is_dirty - the node is set if some worker detected that an error occurred (the INSERT is failed or an Abandoned node is
* detected). If the node appeared workers in this partition should stop and start cleaning and refilling
* partition procedure.
* During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition
* workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node.
* /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker.
* /cluster.db.table2
* ...
*/
#include "Aliases.h"
#include "Internals.h"
#include "TaskCluster.h"
#include "TaskTableAndShard.h"
#include "ShardPartition.h"
#include "ZooKeeperStaff.h"
namespace DB
{
class ClusterCopierApp : public BaseDaemon
class ClusterCopier
{
public:
void initialize(Poco::Util::Application & self) override;
ClusterCopier(const String & task_path_,
const String & host_id_,
const String & proxy_database_name_,
Context & context_)
:
task_zookeeper_path(task_path_),
host_id(host_id_),
working_database_name(proxy_database_name_),
context(context_),
log(&Poco::Logger::get("ClusterCopier")) {}
void handleHelp(const std::string &, const std::string &);
void init();
void defineOptions(Poco::Util::OptionSet & options) override;
template<typename T>
decltype(auto) retry(T && func, UInt64 max_tries = 100);
int main(const std::vector<std::string> &) override;
void discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard) ;
/// Compute set of partitions, assume set of partitions aren't changed during the processing
void discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads = 0);
void uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force);
void reloadTaskDescription();
void updateConfigIfNeeded();
void process(const ConnectionTimeouts & timeouts);
/// Disables DROP PARTITION commands that used to clear data after errors
void setSafeMode(bool is_safe_mode_ = true)
{
is_safe_mode = is_safe_mode_;
}
void setCopyFaultProbability(double copy_fault_probability_)
{
copy_fault_probability = copy_fault_probability_;
}
protected:
String getWorkersPath() const
{
return task_cluster->task_zookeeper_path + "/task_active_workers";
}
String getWorkersPathVersion() const
{
return getWorkersPath() + "_version";
}
String getCurrentWorkerNodePath() const
{
return getWorkersPath() + "/" + host_id;
}
zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(
const zkutil::ZooKeeperPtr &zookeeper,
const String &description,
bool unprioritized);
/** Checks that the whole partition of a table was copied. We should do it carefully due to dirty lock.
* State of some task could change during the processing.
* We have to ensure that all shards have the finished state and there is no dirty flag.
* Moreover, we have to check status twice and check zxid, because state can change during the checking.
*/
bool checkPartitionIsDone(const TaskTable & task_table, const String & partition_name,
const TasksShard & shards_with_partition);
/// Removes MATERIALIZED and ALIAS columns from create table query
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr &query_ast);
/// Replaces ENGINE and table name in a create query
std::shared_ptr<ASTCreateQuery>
rewriteCreateQueryStorage(const ASTPtr & create_query_ast, const DatabaseAndTableName & new_table,
const ASTPtr & new_storage_ast);
bool tryDropPartition(ShardPartition & task_partition,
const zkutil::ZooKeeperPtr & zookeeper,
const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 1000;
static constexpr UInt64 max_shard_partition_tries = 600;
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
PartitionTaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts,
ShardPartition & task_partition,
bool is_unprioritized_task);
PartitionTaskStatus processPartitionTaskImpl(const ConnectionTimeouts & timeouts,
ShardPartition & task_partition,
bool is_unprioritized_task);
void dropAndCreateLocalTable(const ASTPtr & create_ast);
void dropLocalTableIfExists (const DatabaseAndTableName & table_name) const;
String getRemoteCreateTable(const DatabaseAndTableName & table,
Connection & connection,
const Settings * settings = nullptr);
ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts,
TaskShard & task_shard);
void createShardInternalTables(const ConnectionTimeouts & timeouts,
TaskShard & task_shard,
bool create_split = true);
std::set<String> getShardPartitions(const ConnectionTimeouts & timeouts,
TaskShard & task_shard);
bool checkShardHasPartition(const ConnectionTimeouts & timeouts,
TaskShard & task_shard,
const String & partition_quoted_name);
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
* Returns number of shards for which at least one replica executed query successfully
*/
UInt64 executeQueryOnCluster(
const ClusterPtr & cluster,
const String & query,
const ASTPtr & query_ast_ = nullptr,
const Settings * settings = nullptr,
PoolMode pool_mode = PoolMode::GET_ALL,
UInt64 max_successful_executions_per_shard = 0) const;
private:
String task_zookeeper_path;
String task_description_path;
String host_id;
String working_database_name;
using Base = BaseDaemon;
/// Auto update config stuff
UInt64 task_descprtion_current_version = 1;
std::atomic<UInt64> task_descprtion_version{1};
Coordination::WatchCallback task_description_watch_callback;
/// ZooKeeper session used to set the callback
zkutil::ZooKeeperPtr task_description_watch_zookeeper;
void mainImpl();
ConfigurationPtr task_cluster_initial_config;
ConfigurationPtr task_cluster_current_config;
Coordination::Stat task_descprtion_current_stat{};
void setupLogging();
std::unique_ptr<TaskCluster> task_cluster;
std::string config_xml_path;
std::string task_path;
std::string log_level = "debug";
bool is_safe_mode = false;
double copy_fault_probability = 0;
bool is_help = false;
double copy_fault_probability = 0.0;
std::string base_dir;
std::string process_path;
std::string process_id;
std::string host_id;
Context & context;
Poco::Logger * log;
std::chrono::milliseconds default_sleep_time{1000};
};
}

View File

@ -0,0 +1,172 @@
#include "ClusterCopierApp.h"
namespace DB
{
/// ClusterCopierApp
void ClusterCopierApp::initialize(Poco::Util::Application & self)
{
is_help = config().has("help");
if (is_help)
return;
config_xml_path = config().getString("config-file");
task_path = config().getString("task-path");
log_level = config().getString("log-level", "trace");
is_safe_mode = config().has("safe-mode");
if (config().has("copy-fault-probability"))
copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
base_dir = (config().has("base-dir")) ? config().getString("base-dir") : Poco::Path::current();
// process_id is '<hostname>#<start_timestamp>_<pid>'
time_t timestamp = Poco::Timestamp().epochTime();
auto curr_pid = Poco::Process::id();
process_id = std::to_string(DateLUT::instance().toNumYYYYMMDDhhmmss(timestamp)) + "_" + std::to_string(curr_pid);
host_id = escapeForFileName(getFQDNOrHostName()) + '#' + process_id;
process_path = Poco::Path(base_dir + "/clickhouse-copier_" + process_id).absolute().toString();
Poco::File(process_path).createDirectories();
/// Override variables for BaseDaemon
if (config().has("log-level"))
config().setString("logger.level", config().getString("log-level"));
if (config().has("base-dir") || !config().has("logger.log"))
config().setString("logger.log", process_path + "/log.log");
if (config().has("base-dir") || !config().has("logger.errorlog"))
config().setString("logger.errorlog", process_path + "/log.err.log");
Base::initialize(self);
}
void ClusterCopierApp::handleHelp(const std::string &, const std::string &)
{
Poco::Util::HelpFormatter helpFormatter(options());
helpFormatter.setCommand(commandName());
helpFormatter.setHeader("Copies tables from one cluster to another");
helpFormatter.setUsage("--config-file <config-file> --task-path <task-path>");
helpFormatter.format(std::cerr);
stopOptionsProcessing();
}
void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
{
Base::defineOptions(options);
options.addOption(Poco::Util::Option("task-path", "", "path to task in ZooKeeper")
.argument("task-path").binding("task-path"));
options.addOption(Poco::Util::Option("task-file", "", "path to task file for uploading in ZooKeeper to task-path")
.argument("task-file").binding("task-file"));
options.addOption(Poco::Util::Option("task-upload-force", "", "Force upload task-file even node already exists")
.argument("task-upload-force").binding("task-upload-force"));
options.addOption(Poco::Util::Option("safe-mode", "", "disables ALTER DROP PARTITION in case of errors")
.binding("safe-mode"));
options.addOption(Poco::Util::Option("copy-fault-probability", "", "the copying fails with specified probability (used to test partition state recovering)")
.argument("copy-fault-probability").binding("copy-fault-probability"));
options.addOption(Poco::Util::Option("log-level", "", "sets log level")
.argument("log-level").binding("log-level"));
options.addOption(Poco::Util::Option("base-dir", "", "base directory for copiers, consecutive copier launches will populate /base-dir/launch_id/* directories")
.argument("base-dir").binding("base-dir"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
}
void ClusterCopierApp::mainImpl()
{
StatusFile status_file(process_path + "/status");
ThreadStatus thread_status;
auto log = &logger();
LOG_INFO(log, "Starting clickhouse-copier ("
<< "id " << process_id << ", "
<< "host_id " << host_id << ", "
<< "path " << process_path << ", "
<< "revision " << ClickHouseRevision::get() << ")");
auto context = std::make_unique<Context>(Context::createGlobal());
context->makeGlobalContext();
SCOPE_EXIT(context->shutdown());
context->setConfig(loaded_config.configuration);
context->setApplicationType(Context::ApplicationType::LOCAL);
context->setPath(process_path);
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
static const std::string default_database = "_local";
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
context->setCurrentDatabase(default_database);
/// Initialize query scope just in case.
CurrentThread::QueryScope query_scope(*context);
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, *context);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
auto task_file = config().getString("task-file", "");
if (!task_file.empty())
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
copier->init();
copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef()));
/// Reset ZooKeeper before removing ClusterCopier.
/// Otherwise zookeeper watch can call callback which use already removed ClusterCopier object.
context->resetZooKeeper();
}
int ClusterCopierApp::main(const std::vector<std::string> &)
{
if (is_help)
return 0;
try
{
mainImpl();
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::root(), __PRETTY_FUNCTION__);
auto code = getCurrentExceptionCode();
return (code) ? code : -1;
}
return 0;
}
}
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wmissing-declarations"
int mainEntryClickHouseClusterCopier(int argc, char ** argv)
{
try
{
DB::ClusterCopierApp app;
return app.run(argc, argv);
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return (code) ? code : -1;
}
}

View File

@ -0,0 +1,90 @@
#pragma once
#include <Poco/Util/ServerApplication.h>
#include <daemon/BaseDaemon.h>
#include "ClusterCopier.h"
/* clickhouse cluster copier util
* Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
*
* See overview in the docs: docs/en/utils/clickhouse-copier.md
*
* Implementation details:
*
* cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
* Distributed table (to preform data resharding). So, worker job is a partition of a source shard.
* A job has three states: Active, Finished and Abandoned. Abandoned means that worker died and did not finish the job.
*
* If an error occurred during the copying (a worker failed or a worker did not finish the INSERT), then the whole partition (on
* all destination servers) should be dropped and refilled. So, copying entity is a partition of all destination shards.
* If a failure is detected a special /is_dirty node is created in ZooKeeper signalling that other workers copying the same partition
* should stop, after a refilling procedure should start.
*
* ZooKeeper task node has the following structure:
* /task/path_root - path passed in --task-path parameter
* /description - contains user-defined XML config of the task
* /task_active_workers - contains ephemeral nodes of all currently active workers, used to implement max_workers limitation
* /server_fqdn#PID_timestamp - cluster-copier worker ID
* ...
* /tables - directory with table tasks
* /cluster.db.table1 - directory of table_hits task
* /partition1 - directory for partition1
* /shards - directory for source cluster shards
* /1 - worker job for the first shard of partition1 of table test.hits
* Contains info about current status (Active or Finished) and worker ID.
* /2
* ...
* /partition_active_workers
* /1 - for each job in /shards a corresponding ephemeral node created in /partition_active_workers
* It is used to detect Abandoned jobs (if there is Active node in /shards and there is no node in
* /partition_active_workers).
* Also, it is used to track active workers in the partition (when we need to refill the partition we do
* not DROP PARTITION while there are active workers)
* /2
* ...
* /is_dirty - the node is set if some worker detected that an error occurred (the INSERT is failed or an Abandoned node is
* detected). If the node appeared workers in this partition should stop and start cleaning and refilling
* partition procedure.
* During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition
* workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node.
* /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker.
* /cluster.db.table2
* ...
*/
namespace DB
{
class ClusterCopierApp : public BaseDaemon
{
public:
void initialize(Poco::Util::Application & self) override;
void handleHelp(const std::string &, const std::string &);
void defineOptions(Poco::Util::OptionSet & options) override;
int main(const std::vector<std::string> &) override;
private:
using Base = BaseDaemon;
void mainImpl();
std::string config_xml_path;
std::string task_path;
std::string log_level = "trace";
bool is_safe_mode = false;
double copy_fault_probability = 0;
bool is_help = false;
std::string base_dir;
std::string process_path;
std::string process_id;
std::string host_id;
};
}

View File

@ -0,0 +1,17 @@
#pragma once
#include "Aliases.h"
namespace DB
{
/// Contains info about all shards that contain a partition
struct ClusterPartition
{
double elapsed_time_seconds = 0;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
UInt64 blocks_copied = 0;
UInt64 total_tries = 0;
};
}

View File

@ -0,0 +1,141 @@
#include "Internals.h"
namespace DB
{
ConfigurationPtr getConfigurationFromXMLString(const std::string & xml_data)
{
std::stringstream ss(xml_data);
Poco::XML::InputSource input_source{ss};
return {new Poco::Util::XMLConfiguration{&input_source}};
}
String getQuotedTable(const String & database, const String & table)
{
if (database.empty())
return backQuoteIfNeed(table);
return backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
}
String getQuotedTable(const DatabaseAndTableName & db_and_table)
{
return getQuotedTable(db_and_table.first, db_and_table.second);
}
// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
std::shared_ptr<ASTStorage> createASTStorageDistributed(
const String & cluster_name, const String & database, const String & table,
const ASTPtr & sharding_key_ast)
{
auto args = std::make_shared<ASTExpressionList>();
args->children.emplace_back(std::make_shared<ASTLiteral>(cluster_name));
args->children.emplace_back(std::make_shared<ASTIdentifier>(database));
args->children.emplace_back(std::make_shared<ASTIdentifier>(table));
if (sharding_key_ast)
args->children.emplace_back(sharding_key_ast);
auto engine = std::make_shared<ASTFunction>();
engine->name = "Distributed";
engine->arguments = args;
auto storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, engine);
return storage;
}
BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream)
{
return std::make_shared<SquashingBlockInputStream>(
stream,
std::numeric_limits<size_t>::max(),
std::numeric_limits<size_t>::max());
}
Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream)
{
return squashStreamIntoOneBlock(stream)->read();
}
bool isExtendedDefinitionStorage(const ASTPtr & storage_ast)
{
const auto & storage = storage_ast->as<ASTStorage &>();
return storage.partition_by || storage.order_by || storage.sample_by;
}
ASTPtr extractPartitionKey(const ASTPtr & storage_ast)
{
String storage_str = queryToString(storage_ast);
const auto & storage = storage_ast->as<ASTStorage &>();
const auto & engine = storage.engine->as<ASTFunction &>();
if (!endsWith(engine.name, "MergeTree"))
{
throw Exception(
"Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported",
ErrorCodes::BAD_ARGUMENTS);
}
if (isExtendedDefinitionStorage(storage_ast))
{
if (storage.partition_by)
return storage.partition_by->clone();
static const char * all = "all";
return std::make_shared<ASTLiteral>(Field(all, strlen(all)));
}
else
{
bool is_replicated = startsWith(engine.name, "Replicated");
size_t min_args = is_replicated ? 3 : 1;
if (!engine.arguments)
throw Exception("Expected arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS);
ASTPtr arguments_ast = engine.arguments->clone();
ASTs & arguments = arguments_ast->children;
if (arguments.size() < min_args)
throw Exception("Expected at least " + toString(min_args) + " arguments in " + storage_str,
ErrorCodes::BAD_ARGUMENTS);
ASTPtr & month_arg = is_replicated ? arguments[2] : arguments[1];
return makeASTFunction("toYYYYMM", month_arg->clone());
}
}
ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random)
{
ShardPriority res;
if (replicas.empty())
return res;
res.is_remote = 1;
for (auto & replica : replicas)
{
if (isLocalAddress(DNSResolver::instance().resolveHost(replica.host_name)))
{
res.is_remote = 0;
break;
}
}
res.hostname_difference = std::numeric_limits<size_t>::max();
for (auto & replica : replicas)
{
size_t difference = getHostNameDifference(local_hostname, replica.host_name);
res.hostname_difference = std::min(difference, res.hostname_difference);
}
res.random = random;
return res;
}
}

View File

@ -0,0 +1,184 @@
#pragma once
#include <chrono>
#include <optional>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/Logger.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/FormattingChannel.h>
#include <Poco/PatternFormatter.h>
#include <Poco/UUIDGenerator.h>
#include <Poco/File.h>
#include <Poco/Process.h>
#include <Poco/FileChannel.h>
#include <Poco/SplitterChannel.h>
#include <Poco/Util/HelpFormatter.h>
#include <boost/algorithm/string.hpp>
#include <common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/isLocalAddress.h>
#include <Common/typeid_cast.h>
#include <Common/ClickHouseRevision.h>
#include <Common/formatReadable.h>
#include <Common/DNSResolver.h>
#include <Common/CurrentThread.h>
#include <Common/escapeForFileName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/ThreadStatus.h>
#include <Client/Connection.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterExistsQuery.h>
#include <Interpreters/InterpreterShowCreateQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Formats/FormatSettings.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
#include <Functions/registerFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Storages/registerStorages.h>
#include <Storages/StorageDistributed.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Databases/DatabaseMemory.h>
#include <Common/StatusFile.h>
#include "Aliases.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ZOOKEEPER;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TABLE;
extern const int UNFINISHED;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
ConfigurationPtr getConfigurationFromXMLString(const std::string & xml_data);
String getQuotedTable(const String & database, const String & table);
String getQuotedTable(const DatabaseAndTableName & db_and_table);
enum class TaskState
{
Started = 0,
Finished,
Unknown
};
/// Used to mark status of shard partition tasks
struct TaskStateWithOwner
{
TaskStateWithOwner() = default;
TaskStateWithOwner(TaskState state_, const String & owner_) : state(state_), owner(owner_) {}
TaskState state{TaskState::Unknown};
String owner;
static String getData(TaskState state, const String &owner)
{
return TaskStateWithOwner(state, owner).toString();
}
String toString()
{
WriteBufferFromOwnString wb;
wb << static_cast<UInt32>(state) << "\n" << escape << owner;
return wb.str();
}
static TaskStateWithOwner fromString(const String & data)
{
ReadBufferFromString rb(data);
TaskStateWithOwner res;
UInt32 state;
rb >> state >> "\n" >> escape >> res.owner;
if (state >= static_cast<int>(TaskState::Unknown))
throw Exception("Unknown state " + data, ErrorCodes::LOGICAL_ERROR);
res.state = static_cast<TaskState>(state);
return res;
}
};
struct ShardPriority
{
UInt8 is_remote = 1;
size_t hostname_difference = 0;
UInt8 random = 0;
static bool greaterPriority(const ShardPriority & current, const ShardPriority & other)
{
return std::forward_as_tuple(current.is_remote, current.hostname_difference, current.random)
< std::forward_as_tuple(other.is_remote, other.hostname_difference, other.random);
}
};
/// Execution status of a task
enum class PartitionTaskStatus
{
Active,
Finished,
Error,
};
struct MultiTransactionInfo
{
int32_t code;
Coordination::Requests requests;
Coordination::Responses responses;
};
// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
std::shared_ptr<ASTStorage> createASTStorageDistributed(
const String & cluster_name, const String & database, const String & table,
const ASTPtr & sharding_key_ast = nullptr);
BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream);
Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream);
bool isExtendedDefinitionStorage(const ASTPtr & storage_ast);
ASTPtr extractPartitionKey(const ASTPtr & storage_ast);
ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random);
}

View File

@ -0,0 +1,68 @@
#pragma once
#include "Aliases.h"
namespace DB
{
/// Just destination partition of a shard
struct ShardPartition
{
ShardPartition(TaskShard & parent, const String & name_quoted_) : task_shard(parent), name(name_quoted_) {}
String getPartitionPath() const;
String getPartitionCleanStartPath() const;
String getCommonPartitionIsDirtyPath() const;
String getCommonPartitionIsCleanedPath() const;
String getPartitionActiveWorkersPath() const;
String getActiveWorkerPath() const;
String getPartitionShardsPath() const;
String getShardStatusPath() const;
TaskShard & task_shard;
String name;
};
inline String ShardPartition::getPartitionCleanStartPath() const
{
return getPartitionPath() + "/clean_start";
}
inline String ShardPartition::getPartitionPath() const
{
return task_shard.task_table.getPartitionPath(name);
}
inline String ShardPartition::getShardStatusPath() const
{
// schema: /<root...>/tables/<table>/<partition>/shards/<shard>
// e.g. /root/table_test.hits/201701/shards/1
return getPartitionShardsPath() + "/" + toString(task_shard.numberInCluster());
}
inline String ShardPartition::getPartitionShardsPath() const
{
return getPartitionPath() + "/shards";
}
inline String ShardPartition::getPartitionActiveWorkersPath() const
{
return getPartitionPath() + "/partition_active_workers";
}
inline String ShardPartition::getActiveWorkerPath() const
{
return getPartitionActiveWorkersPath() + "/" + toString(task_shard.numberInCluster());
}
inline String ShardPartition::getCommonPartitionIsDirtyPath() const
{
return getPartitionPath() + "/is_dirty";
}
inline String ShardPartition::getCommonPartitionIsCleanedPath() const
{
return getCommonPartitionIsDirtyPath() + "/cleaned";
}
}

View File

@ -0,0 +1,96 @@
#pragma once
#include "Aliases.h"
namespace DB
{
struct TaskCluster
{
TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_)
: task_zookeeper_path(task_zookeeper_path_), default_local_database(default_local_database_) {}
void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
/// Set (or update) settings and max_workers param
void reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
/// Base node for all tasks. Its structure:
/// workers/ - directory with active workers (amount of them is less or equal max_workers)
/// description - node with task configuration
/// table_table1/ - directories with per-partition copying status
String task_zookeeper_path;
/// Database used to create temporary Distributed tables
String default_local_database;
/// Limits number of simultaneous workers
UInt64 max_workers = 0;
/// Base settings for pull and push
Settings settings_common;
/// Settings used to fetch data
Settings settings_pull;
/// Settings used to insert data
Settings settings_push;
String clusters_prefix;
/// Subtasks
TasksTable table_tasks;
std::random_device random_device;
pcg64 random_engine;
};
inline void DB::TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key)
{
String prefix = base_key.empty() ? "" : base_key + ".";
clusters_prefix = prefix + "remote_servers";
if (!config.has(clusters_prefix))
throw Exception("You should specify list of clusters in " + clusters_prefix, ErrorCodes::BAD_ARGUMENTS);
Poco::Util::AbstractConfiguration::Keys tables_keys;
config.keys(prefix + "tables", tables_keys);
for (const auto & table_key : tables_keys)
{
table_tasks.emplace_back(*this, config, prefix + "tables", table_key);
}
}
inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key)
{
String prefix = base_key.empty() ? "" : base_key + ".";
max_workers = config.getUInt64(prefix + "max_workers");
settings_common = Settings();
if (config.has(prefix + "settings"))
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_pull = settings_common;
if (config.has(prefix + "settings_pull"))
settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
settings_push = settings_common;
if (config.has(prefix + "settings_push"))
settings_push.loadSettingsFromConfig(prefix + "settings_push", config);
auto set_default_value = [] (auto && setting, auto && default_value)
{
setting = setting.changed ? setting.value : default_value;
};
/// Override important settings
settings_pull.readonly = 1;
settings_push.insert_distributed_sync = 1;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
set_default_value(settings_pull.max_threads, 1);
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
}
}

View File

@ -0,0 +1,274 @@
#pragma once
#include "Aliases.h"
#include "Internals.h"
#include "ClusterPartition.h"
namespace DB
{
struct TaskShard;
struct TaskTable
{
TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix,
const String & table_key);
TaskCluster & task_cluster;
String getPartitionPath(const String & partition_name) const;
String getPartitionIsDirtyPath(const String & partition_name) const;
String getPartitionIsCleanedPath(const String & partition_name) const;
String getPartitionTaskStatusPath(const String & partition_name) const;
String name_in_config;
/// Used as task ID
String table_id;
/// Source cluster and table
String cluster_pull_name;
DatabaseAndTableName table_pull;
/// Destination cluster and table
String cluster_push_name;
DatabaseAndTableName table_push;
/// Storage of destination table
String engine_push_str;
ASTPtr engine_push_ast;
ASTPtr engine_push_partition_key_ast;
/// A Distributed table definition used to split data
String sharding_key_str;
ASTPtr sharding_key_ast;
ASTPtr engine_split_ast;
/// Additional WHERE expression to filter input data
String where_condition_str;
ASTPtr where_condition_ast;
/// Resolved clusters
ClusterPtr cluster_pull;
ClusterPtr cluster_push;
/// Filter partitions that should be copied
bool has_enabled_partitions = false;
Strings enabled_partitions;
NameSet enabled_partitions_set;
/// Prioritized list of shards
TasksShard all_shards;
TasksShard local_shards;
ClusterPartitions cluster_partitions;
NameSet finished_cluster_partitions;
/// Parition names to process in user-specified order
Strings ordered_partition_names;
ClusterPartition & getClusterPartition(const String & partition_name)
{
auto it = cluster_partitions.find(partition_name);
if (it == cluster_partitions.end())
throw Exception("There are no cluster partition " + partition_name + " in " + table_id, ErrorCodes::LOGICAL_ERROR);
return it->second;
}
Stopwatch watch;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
template <typename RandomEngine>
void initShards(RandomEngine && random_engine);
};
struct TaskShard
{
TaskShard(TaskTable &parent, const ShardInfo &info_) : task_table(parent), info(info_) {}
TaskTable & task_table;
ShardInfo info;
UInt32 numberInCluster() const { return info.shard_num; }
UInt32 indexInCluster() const { return info.shard_num - 1; }
String getDescription() const;
String getHostNameExample() const;
/// Used to sort clusters by their proximity
ShardPriority priority;
/// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard
ColumnWithTypeAndName partition_key_column;
/// There is a task for each destination partition
TasksPartition partition_tasks;
/// Which partitions have been checked for existence
/// If some partition from this lists is exists, it is in partition_tasks
std::set<String> checked_partitions;
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query;
/// Internal distributed tables
DatabaseAndTableName table_read_shard;
DatabaseAndTableName table_split_shard;
};
inline String TaskTable::getPartitionPath(const String & partition_name) const
{
return task_cluster.task_zookeeper_path // root
+ "/tables/" + table_id // tables/dst_cluster.merge.hits
+ "/" + escapeForFileName(partition_name); // 201701
}
inline String TaskTable::getPartitionIsDirtyPath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/is_dirty";
}
inline String TaskTable::getPartitionIsCleanedPath(const String & partition_name) const
{
return getPartitionIsDirtyPath(partition_name) + "/cleaned";
}
inline String TaskTable::getPartitionTaskStatusPath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/shards";
}
inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix_,
const String & table_key)
: task_cluster(parent)
{
String table_prefix = prefix_ + "." + table_key + ".";
name_in_config = table_key;
cluster_pull_name = config.getString(table_prefix + "cluster_pull");
cluster_push_name = config.getString(table_prefix + "cluster_push");
table_pull.first = config.getString(table_prefix + "database_pull");
table_pull.second = config.getString(table_prefix + "table_pull");
table_push.first = config.getString(table_prefix + "database_push");
table_push.second = config.getString(table_prefix + "table_push");
/// Used as node name in ZooKeeper
table_id = escapeForFileName(cluster_push_name)
+ "." + escapeForFileName(table_push.first)
+ "." + escapeForFileName(table_push.second);
engine_push_str = config.getString(table_prefix + "engine");
{
ParserStorage parser_storage;
engine_push_ast = parseQuery(parser_storage, engine_push_str, 0);
engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
}
sharding_key_str = config.getString(table_prefix + "sharding_key");
{
ParserExpressionWithOptionalAlias parser_expression(false);
sharding_key_ast = parseQuery(parser_expression, sharding_key_str, 0);
engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast);
}
where_condition_str = config.getString(table_prefix + "where_condition", "");
if (!where_condition_str.empty())
{
ParserExpressionWithOptionalAlias parser_expression(false);
where_condition_ast = parseQuery(parser_expression, where_condition_str, 0);
// Will use canonical expression form
where_condition_str = queryToString(where_condition_ast);
}
String enabled_partitions_prefix = table_prefix + "enabled_partitions";
has_enabled_partitions = config.has(enabled_partitions_prefix);
if (has_enabled_partitions)
{
Strings keys;
config.keys(enabled_partitions_prefix, keys);
if (keys.empty())
{
/// Parse list of partition from space-separated string
String partitions_str = config.getString(table_prefix + "enabled_partitions");
boost::trim_if(partitions_str, isWhitespaceASCII);
boost::split(enabled_partitions, partitions_str, isWhitespaceASCII, boost::token_compress_on);
}
else
{
/// Parse sequence of <partition>...</partition>
for (const String & key : keys)
{
if (!startsWith(key, "partition"))
throw Exception("Unknown key " + key + " in " + enabled_partitions_prefix, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
enabled_partitions.emplace_back(config.getString(enabled_partitions_prefix + "." + key));
}
}
std::copy(enabled_partitions.begin(), enabled_partitions.end(), std::inserter(enabled_partitions_set, enabled_partitions_set.begin()));
}
}
template<typename RandomEngine>
inline void TaskTable::initShards(RandomEngine && random_engine)
{
const String & fqdn_name = getFQDNOrHostName();
std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
for (auto & shard_info : cluster_pull->getShardsInfo())
{
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
task_shard->priority = getReplicasPriority(replicas, fqdn_name, get_urand(random_engine));
all_shards.emplace_back(task_shard);
}
// Sort by priority
std::sort(all_shards.begin(), all_shards.end(),
[](const TaskShardPtr & lhs, const TaskShardPtr & rhs)
{
return ShardPriority::greaterPriority(lhs->priority, rhs->priority);
});
// Cut local shards
auto it_first_remote = std::lower_bound(all_shards.begin(), all_shards.end(), 1,
[](const TaskShardPtr & lhs, UInt8 is_remote)
{
return lhs->priority.is_remote < is_remote;
});
local_shards.assign(all_shards.begin(), it_first_remote);
}
inline String DB::TaskShard::getDescription() const
{
std::stringstream ss;
ss << "N" << numberInCluster()
<< " (having a replica " << getHostNameExample()
<< ", pull table " + getQuotedTable(task_table.table_pull)
<< " of cluster " + task_table.cluster_pull_name << ")";
return ss.str();
}
inline String DB::TaskShard::getHostNameExample() const
{
auto &replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
return replicas.at(0).readableString();
}
}

View File

@ -0,0 +1,224 @@
#pragma once
/** Allows to compare two incremental counters of type UInt32 in presence of possible overflow.
* We assume that we compare values that are not too far away.
* For example, when we increment 0xFFFFFFFF, we get 0. So, 0xFFFFFFFF is less than 0.
*/
class WrappingUInt32
{
public:
UInt32 value;
explicit WrappingUInt32(UInt32 _value)
: value(_value)
{}
bool operator<(const WrappingUInt32 & other) const
{
return value != other.value && *this <= other;
}
bool operator<=(const WrappingUInt32 & other) const
{
const UInt32 HALF = 1 << 31;
return (value <= other.value && other.value - value < HALF)
|| (value > other.value && value - other.value > HALF);
}
bool operator==(const WrappingUInt32 & other) const
{
return value == other.value;
}
};
/** Conforming Zxid definition.
* cf. https://github.com/apache/zookeeper/blob/631d1b284f0edb1c4f6b0fb221bf2428aec71aaa/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md#guarantees-properties-and-definitions
*
* But it is better to read this: https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html
*
* Actually here is the definition of Zxid.
* Every change to the ZooKeeper state receives a stamp in the form of a zxid (ZooKeeper Transaction Id).
* This exposes the total ordering of all changes to ZooKeeper. Each change will have a unique zxid
* and if zxid1 is smaller than zxid2 then zxid1 happened before zxid2.
*/
class Zxid
{
public:
WrappingUInt32 epoch;
WrappingUInt32 counter;
explicit Zxid(UInt64 _zxid)
: epoch(_zxid >> 32)
, counter(_zxid)
{}
bool operator<=(const Zxid & other) const
{
return (epoch < other.epoch)
|| (epoch == other.epoch && counter <= other.counter);
}
bool operator==(const Zxid & other) const
{
return epoch == other.epoch && counter == other.counter;
}
};
/* When multiple ClusterCopiers discover that the target partition is not empty,
* they will attempt to clean up this partition before proceeding to copying.
*
* Instead of purging is_dirty, the history of cleaning work is preserved and partition hygiene is established
* based on a happens-before relation between the events.
* This relation is encoded by LogicalClock based on the mzxid of the is_dirty ZNode and is_dirty/cleaned.
* The fact of the partition hygiene is encoded by CleanStateClock.
*
* For you to know what mzxid means:
*
* ZooKeeper Stat Structure:
* The Stat structure for each znode in ZooKeeper is made up of the following fields:
*
* -- czxid
* The zxid of the change that caused this znode to be created.
*
* -- mzxid
* The zxid of the change that last modified this znode.
*
* -- ctime
* The time in milliseconds from epoch when this znode was created.
*
* -- mtime
* The time in milliseconds from epoch when this znode was last modified.
*
* -- version
* The number of changes to the data of this znode.
*
* -- cversion
* The number of changes to the children of this znode.
*
* -- aversion
* The number of changes to the ACL of this znode.
*
* -- ephemeralOwner
* The session id of the owner of this znode if the znode is an ephemeral node.
* If it is not an ephemeral node, it will be zero.
*
* -- dataLength
* The length of the data field of this znode.
*
* -- numChildren
* The number of children of this znode.
* */
class LogicalClock
{
public:
std::optional<Zxid> zxid;
LogicalClock() = default;
explicit LogicalClock(UInt64 _zxid)
: zxid(_zxid)
{}
bool hasHappened() const
{
return bool(zxid);
}
/// happens-before relation with a reasonable time bound
bool happensBefore(const LogicalClock & other) const
{
return !zxid
|| (other.zxid && *zxid <= *other.zxid);
}
bool operator<=(const LogicalClock & other) const
{
return happensBefore(other);
}
/// strict equality check
bool operator==(const LogicalClock & other) const
{
return zxid == other.zxid;
}
};
class CleanStateClock
{
public:
LogicalClock discovery_zxid;
std::optional<UInt32> discovery_version;
LogicalClock clean_state_zxid;
std::optional<UInt32> clean_state_version;
std::shared_ptr<std::atomic_bool> stale;
bool is_clean() const
{
return
!is_stale()
&& (
!discovery_zxid.hasHappened()
|| (clean_state_zxid.hasHappened() && discovery_zxid <= clean_state_zxid));
}
bool is_stale() const
{
return stale->load();
}
CleanStateClock(
const zkutil::ZooKeeperPtr & zookeeper,
const String & discovery_path,
const String & clean_state_path)
: stale(std::make_shared<std::atomic_bool>(false))
{
Coordination::Stat stat{};
String _some_data;
auto watch_callback =
[stale = stale] (const Coordination::WatchResponse & rsp)
{
auto logger = &Poco::Logger::get("ClusterCopier");
if (rsp.error == Coordination::ZOK)
{
switch (rsp.type)
{
case Coordination::CREATED:
LOG_DEBUG(logger, "CleanStateClock change: CREATED, at " << rsp.path);
stale->store(true);
break;
case Coordination::CHANGED:
LOG_DEBUG(logger, "CleanStateClock change: CHANGED, at" << rsp.path);
stale->store(true);
}
}
};
if (zookeeper->tryGetWatch(discovery_path, _some_data, &stat, watch_callback))
{
discovery_zxid = LogicalClock(stat.mzxid);
discovery_version = stat.version;
}
if (zookeeper->tryGetWatch(clean_state_path, _some_data, &stat, watch_callback))
{
clean_state_zxid = LogicalClock(stat.mzxid);
clean_state_version = stat.version;
}
}
bool operator==(const CleanStateClock & other) const
{
return !is_stale()
&& !other.is_stale()
&& discovery_zxid == other.discovery_zxid
&& discovery_version == other.discovery_version
&& clean_state_zxid == other.clean_state_zxid
&& clean_state_version == other.clean_state_version;
}
bool operator!=(const CleanStateClock & other) const
{
return !(*this == other);
}
};

View File

@ -403,15 +403,13 @@
</text_log>
-->
<!-- Uncomment to write metric log into table.
Metric log contains rows with current values of ProfileEvents, CurrentMetrics collected with "collect_interval_milliseconds" interval.
<!-- Metric log contains rows with current values of ProfileEvents, CurrentMetrics collected with "collect_interval_milliseconds" interval. -->
<metric_log>
<database>system</database>
<table>metric_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
</metric_log>
-->
<!-- Parameters for embedded dictionaries, used in Yandex.Metrica.
See https://clickhouse.yandex/docs/en/dicts/internal_dicts/

View File

@ -193,7 +193,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
{
stream << "Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText()
<< (with_stacktrace ? getExceptionStackTraceString(e) : "")
<< (with_stacktrace ? ", Stack trace (when copying this message, always include the lines below):\n\n" + getExceptionStackTraceString(e) : "")
<< (with_extra_info ? getExtraExceptionInfo(e) : "")
<< " (version " << VERSION_STRING << VERSION_OFFICIAL << ")";
}
@ -210,9 +210,9 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
name += " (demangling status: " + toString(status) + ")";
stream << "std::exception. Code: " << ErrorCodes::STD_EXCEPTION << ", type: " << name << ", e.what() = " << e.what()
<< (with_stacktrace ? getExceptionStackTraceString(e) : "")
<< (with_stacktrace ? ", Stack trace (when copying this message, always include the lines below):\n\n" + getExceptionStackTraceString(e) : "")
<< (with_extra_info ? getExtraExceptionInfo(e) : "")
<< ", version = " << VERSION_STRING << VERSION_OFFICIAL;
<< " (version " << VERSION_STRING << VERSION_OFFICIAL << ")";
}
catch (...) {}
}

View File

@ -1,14 +1,13 @@
#include "CompressedReadBufferFromFile.h"
#include <IO/createReadBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <Compression/CompressionInfo.h>
#include <Compression/LZ4_decompress_faster.h>
#include <IO/WriteHelpers.h>
#include <IO/createReadBufferFromFileBase.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SEEK_POSITION_OUT_OF_BOUND;
@ -31,12 +30,18 @@ bool CompressedReadBufferFromFile::nextImpl()
return true;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf)
: BufferWithOwnMemory<ReadBuffer>(0), p_file_in(std::move(buf)), file_in(*p_file_in)
{
compressed_in = &file_in;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size)
: BufferWithOwnMemory<ReadBuffer>(0),
p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, mmap_threshold, buf_size)),
file_in(*p_file_in)
: BufferWithOwnMemory<ReadBuffer>(0)
, p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, mmap_threshold, buf_size))
, file_in(*p_file_in)
{
compressed_in = &file_in;
}
@ -45,7 +50,7 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(
void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
if (size_compressed &&
offset_in_compressed_file == file_in.getPositionInFile() - size_compressed &&
offset_in_compressed_file == file_in.getPosition() - size_compressed &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();

View File

@ -28,6 +28,8 @@ private:
bool nextImpl() override;
public:
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf);
CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);

View File

@ -9,23 +9,12 @@
#include <Compression/CompressionFactory.h>
namespace ProfileEvents
{
extern const Event ReadCompressedBytes;
extern const Event CompressedReadBufferBlocks;
extern const Event CompressedReadBufferBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CHECKSUM_DOESNT_MATCH;
extern const int TOO_LARGE_SIZE_COMPRESSED;
extern const int UNKNOWN_COMPRESSION_METHOD;
extern const int CANNOT_DECOMPRESS;
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int CORRUPTED_DATA;
}

View File

@ -200,7 +200,7 @@ void DiskLocal::copyFile(const String & from_path, const String & to_path)
Poco::File(disk_path + from_path).copyTo(disk_path + to_path);
}
std::unique_ptr<SeekableReadBuffer> DiskLocal::readFile(const String & path, size_t buf_size) const
std::unique_ptr<ReadBufferFromFileBase> DiskLocal::readFile(const String & path, size_t buf_size) const
{
return std::make_unique<ReadBufferFromFile>(disk_path + path, buf_size);
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
@ -66,7 +67,7 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override;

View File

@ -15,6 +15,8 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS;
extern const int DIRECTORY_DOESNT_EXIST;
extern const int CANNOT_DELETE_DIRECTORY;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
@ -37,8 +39,59 @@ private:
std::vector<String>::iterator iter;
};
ReadIndirectBuffer::ReadIndirectBuffer(String path_, const String & data_)
: ReadBufferFromFileBase(), buf(ReadBufferFromString(data_)), path(std::move(path_))
{
internal_buffer = buf.buffer();
working_buffer = internal_buffer;
pos = working_buffer.begin();
}
off_t ReadIndirectBuffer::seek(off_t offset, int whence)
{
if (whence == SEEK_SET)
{
if (offset >= 0 && working_buffer.begin() + offset < working_buffer.end())
{
pos = working_buffer.begin() + offset;
return size_t(pos - working_buffer.begin());
}
else
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else if (whence == SEEK_CUR)
{
Position new_pos = pos + offset;
if (new_pos >= working_buffer.begin() && new_pos < working_buffer.end())
{
pos = new_pos;
return size_t(pos - working_buffer.begin());
}
else
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else
throw Exception("Only SEEK_SET and SEEK_CUR seek modes allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
off_t ReadIndirectBuffer::getPosition()
{
return pos - working_buffer.begin();
}
void WriteIndirectBuffer::finalize()
{
if (isFinished())
return;
next();
WriteBufferFromVector::finalize();
@ -249,7 +302,7 @@ void DiskMemory::copyFile(const String & /*from_path*/, const String & /*to_path
throw Exception("Method copyFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
std::unique_ptr<SeekableReadBuffer> DiskMemory::readFile(const String & path, size_t /*buf_size*/) const
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, size_t /*buf_size*/) const
{
std::lock_guard lock(mutex);
@ -257,7 +310,7 @@ std::unique_ptr<SeekableReadBuffer> DiskMemory::readFile(const String & path, si
if (iter == files.end())
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
return std::make_unique<ReadBufferFromString>(iter->second.data);
return std::make_unique<ReadIndirectBuffer>(path, iter->second.data);
}
std::unique_ptr<WriteBuffer> DiskMemory::writeFile(const String & path, size_t /*buf_size*/, WriteMode mode)

View File

@ -5,6 +5,8 @@
#include <unordered_map>
#include <utility>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
namespace DB
@ -13,7 +15,22 @@ class DiskMemory;
class ReadBuffer;
class WriteBuffer;
// This class is responsible to update files metadata after buffer is finalized.
/// Adapter with actual behaviour as ReadBufferFromString.
class ReadIndirectBuffer : public ReadBufferFromFileBase
{
public:
ReadIndirectBuffer(String path_, const String & data_);
std::string getFileName() const override { return path; }
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
private:
ReadBufferFromString buf;
String path;
};
/// This class is responsible to update files metadata after buffer is finalized.
class WriteIndirectBuffer : public WriteBufferFromOwnString
{
public:
@ -76,7 +93,7 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<WriteBuffer>
writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override;

View File

@ -137,17 +137,17 @@ namespace
// Reads data from S3.
// It supports reading from multiple S3 paths that resides in Metadata.
class ReadIndirectBufferFromS3 : public BufferWithOwnMemory<SeekableReadBuffer>
class ReadIndirectBufferFromS3 : public ReadBufferFromFileBase
{
public:
ReadIndirectBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, Metadata metadata_, size_t buf_size_)
: BufferWithOwnMemory(buf_size_)
: ReadBufferFromFileBase()
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, metadata(std::move(metadata_))
, buf_size(buf_size_)
, offset(0)
, absolute_position(0)
, initialized(false)
, current_buf_idx(0)
, current_buf(nullptr)
@ -156,9 +156,6 @@ namespace
off_t seek(off_t offset_, int whence) override
{
if (initialized)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
@ -169,14 +166,23 @@ namespace
+ std::to_string(offset_) + ", Max: " + std::to_string(metadata.total_size),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
offset = offset_;
absolute_position = offset_;
return offset;
/// TODO: Do not re-initialize buffer if current position within working buffer.
current_buf = initialize();
pos = working_buffer.end();
return absolute_position;
}
off_t getPosition() override { return absolute_position - available(); }
std::string getFileName() const override { return metadata.metadata_file_path; }
private:
std::unique_ptr<ReadBufferFromS3> initialize()
{
size_t offset = absolute_position;
for (UInt32 i = 0; i < metadata.s3_objects_count; ++i)
{
current_buf_idx = i;
@ -190,6 +196,7 @@ namespace
}
offset -= size;
}
initialized = true;
return nullptr;
}
@ -199,14 +206,13 @@ namespace
if (!initialized)
{
current_buf = initialize();
initialized = true;
}
// If current buffer has remaining data - use it.
if (current_buf && current_buf->next())
{
working_buffer = current_buf->buffer();
absolute_position += working_buffer.size();
return true;
}
@ -219,6 +225,7 @@ namespace
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, path, buf_size);
current_buf->next();
working_buffer = current_buf->buffer();
absolute_position += working_buffer.size();
return true;
}
@ -229,7 +236,7 @@ namespace
Metadata metadata;
size_t buf_size;
size_t offset;
size_t absolute_position = 0;
bool initialized;
UInt32 current_buf_idx;
std::unique_ptr<ReadBufferFromS3> current_buf;
@ -337,8 +344,13 @@ private:
};
DiskS3::DiskS3(String name_, std::shared_ptr<Aws::S3::S3Client> client_, String bucket_, String s3_root_path_,
String metadata_path_, size_t min_upload_part_size_)
DiskS3::DiskS3(
String name_,
std::shared_ptr<Aws::S3::S3Client> client_,
String bucket_,
String s3_root_path_,
String metadata_path_,
size_t min_upload_part_size_)
: name(std::move(name_))
, client(std::move(client_))
, bucket(std::move(bucket_))
@ -445,7 +457,7 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
to.save();
}
std::unique_ptr<SeekableReadBuffer> DiskS3::readFile(const String & path, size_t buf_size) const
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size) const
{
Metadata metadata(metadata_path + path);
@ -628,8 +640,8 @@ void registerDiskS3(DiskFactory & factory)
String metadata_path = context.getPath() + "disks/" + name + "/";
auto s3disk = std::make_shared<DiskS3>(name, client, uri.bucket, uri.key, metadata_path,
context.getSettingsRef().s3_min_upload_part_size);
auto s3disk
= std::make_shared<DiskS3>(name, client, uri.bucket, uri.key, metadata_path, context.getSettingsRef().s3_min_upload_part_size);
/// This code is used only to check access to the corresponding disk.
checkWriteAccess(s3disk);

View File

@ -62,7 +62,7 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size) const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size, WriteMode mode) override;

View File

@ -25,7 +25,7 @@ using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
class SeekableReadBuffer;
class ReadBufferFromFileBase;
class WriteBuffer;
/**
@ -122,7 +122,7 @@ public:
virtual void copyFile(const String & from_path, const String & to_path) = 0;
/// Open the file for read and return SeekableReadBuffer object.
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const = 0;
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const = 0;
/// Open the file for write and return WriteBuffer object.
virtual std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) = 0;

View File

@ -108,11 +108,13 @@ TYPED_TEST(DiskTest, readFile)
// Test SEEK_SET
{
DB::String data;
String buf(4, '0');
std::unique_ptr<DB::SeekableReadBuffer> in = disk->readFile("test_file");
in->seek(5, SEEK_SET);
readString(data, *in);
EXPECT_EQ("data", data);
in->readStrict(buf.data(), 4);
EXPECT_EQ("data", buf);
}
// Test SEEK_CUR

View File

@ -100,7 +100,7 @@ int MMapReadBufferFromFileDescriptor::getFD() const
return fd;
}
off_t MMapReadBufferFromFileDescriptor::getPositionInFile()
off_t MMapReadBufferFromFileDescriptor::getPosition()
{
return count();
}

View File

@ -32,9 +32,9 @@ public:
/// unmap memory before call to destructor
void finish();
off_t getPositionInFile() override;
off_t getPosition() override;
std::string getFileName() const override;
int getFD() const override;
int getFD() const;
private:
size_t length = 0;

View File

@ -164,17 +164,17 @@ off_t ReadBufferAIO::seek(off_t off, int whence)
{
if (off >= 0)
{
if (off > (std::numeric_limits<off_t>::max() - getPositionInFile()))
if (off > (std::numeric_limits<off_t>::max() - getPosition()))
throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
else if (off < -getPositionInFile())
else if (off < -getPosition())
throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
new_pos_in_file = getPositionInFile() + off;
new_pos_in_file = getPosition() + off;
}
else
throw Exception("ReadBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (new_pos_in_file != getPositionInFile())
if (new_pos_in_file != getPosition())
{
off_t first_read_pos_in_file = first_unread_pos_in_file - static_cast<off_t>(working_buffer.size());
if (hasPendingData() && (new_pos_in_file >= first_read_pos_in_file) && (new_pos_in_file <= first_unread_pos_in_file))

View File

@ -36,9 +36,9 @@ public:
ReadBufferAIO & operator=(const ReadBufferAIO &) = delete;
void setMaxBytes(size_t max_bytes_read_);
off_t getPositionInFile() override { return first_unread_pos_in_file - (working_buffer.end() - pos); }
off_t getPosition() override { return first_unread_pos_in_file - (working_buffer.end() - pos); }
std::string getFileName() const override { return filename; }
int getFD() const override { return fd; }
int getFD() const { return fd; }
off_t seek(off_t off, int whence) override;

View File

@ -12,7 +12,6 @@
namespace DB
{
class ReadBufferFromFileBase : public BufferWithOwnMemory<SeekableReadBuffer>
{
public:
@ -20,9 +19,7 @@ public:
ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
ReadBufferFromFileBase(ReadBufferFromFileBase &&) = default;
~ReadBufferFromFileBase() override;
virtual off_t getPositionInFile() = 0;
virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;
/// It is possible to get information about the time of each reading.
struct ProfileInfo
@ -44,7 +41,6 @@ public:
protected:
ProfileCallback profile_callback;
clockid_t clock_type{};
};
}

View File

@ -27,12 +27,12 @@ public:
ReadBufferFromFileDescriptor(ReadBufferFromFileDescriptor &&) = default;
int getFD() const override
int getFD() const
{
return fd;
}
off_t getPositionInFile() override
off_t getPosition() override
{
return pos_in_file - (working_buffer.end() - pos);
}

View File

@ -43,4 +43,9 @@ off_t ReadBufferFromMemory::seek(off_t offset, int whence)
throw Exception("Only SEEK_SET and SEEK_CUR seek modes allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
off_t ReadBufferFromMemory::getPosition()
{
return pos - working_buffer.begin();
}
}

View File

@ -25,6 +25,8 @@ public:
}
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
};
}

View File

@ -58,6 +58,12 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
return offset;
}
off_t ReadBufferFromS3::getPosition()
{
return offset + count();
}
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
LOG_TRACE(log, "Read S3 object. Bucket: " + bucket + ", Key: " + key + ", Offset: " + std::to_string(offset));

View File

@ -45,6 +45,7 @@ public:
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
private:
std::unique_ptr<ReadBuffer> initialize();

View File

@ -20,6 +20,11 @@ public:
* @return New position from the begging of underlying buffer / file.
*/
virtual off_t seek(off_t off, int whence) = 0;
/**
* @return Offset from the begin of the underlying buffer / file corresponds to the buffer current position.
*/
virtual off_t getPosition() = 0;
};
}

View File

@ -283,14 +283,14 @@ bool test6(const std::string & filename, const std::string & buf)
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (in.getPositionInFile() != 0)
if (in.getPosition() != 0)
return false;
size_t count = in.read(newbuf.data(), newbuf.length());
if (count != newbuf.length())
return false;
if (static_cast<size_t>(in.getPositionInFile()) != buf.length())
if (static_cast<size_t>(in.getPosition()) != buf.length())
return false;
return true;
@ -646,7 +646,7 @@ bool test20(const std::string & filename, const std::string & buf)
return false;
}
(void) in.getPositionInFile();
(void) in.getPosition();
{
std::string newbuf;

View File

@ -124,11 +124,12 @@ public:
{
/// leave other comparisons as is
}
else if (functionIsInOperator(node.name)) /// IN, NOT IN
else if (functionIsLikeOperator(node.name) || /// LIKE, NOT LIKE
functionIsInOperator(node.name)) /// IN, NOT IN
{
if (auto ident = node.arguments->children.at(0)->as<ASTIdentifier>())
if (size_t min_table = checkIdentifier(*ident))
asts_to_join_on[min_table].push_back(ast);
/// leave as is. It's not possible to make push down here cause of unknown aliases and not implemented JOIN predicates.
/// select a as b form t1, t2 where t1.x = t2.x and b in(42)
/// select a as b form t1 inner join t2 on t1.x = t2.x and b in(42)
}
else
{
@ -198,16 +199,6 @@ private:
}
return 0;
}
size_t checkIdentifier(const ASTIdentifier & identifier)
{
size_t best_table_pos = 0;
bool match = IdentifierSemantic::chooseTable(identifier, tables, best_table_pos);
if (match && joined_tables[best_table_pos].canAttachOnExpression())
return best_table_pos;
return 0;
}
};
using CheckExpressionMatcher = ConstOneTypeMatcher<CheckExpressionVisitorData, false>;

View File

@ -13,4 +13,9 @@ inline bool functionIsInOrGlobalInOperator(const std::string & name)
return functionIsInOperator(name) || name == "globalIn" || name == "globalNotIn";
}
inline bool functionIsLikeOperator(const std::string & name)
{
return name == "like" || name == "notLike";
}
}

View File

@ -263,7 +263,7 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
std::vector<Edge *> updated_direct_edges;
{
#ifndef N_DEBUG
#ifndef NDEBUG
Stopwatch watch;
#endif
@ -279,7 +279,7 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
return false;
}
#ifndef N_DEBUG
#ifndef NDEBUG
node.execution_state->preparation_time_ns += watch.elapsed();
#endif
@ -468,7 +468,7 @@ void PipelineExecutor::execute(size_t num_threads)
}
catch (...)
{
#ifndef N_DEBUG
#ifndef NDEBUG
LOG_TRACE(log, "Exception while executing query. Current state:\n" << dumpPipeline());
#endif
throw;
@ -491,7 +491,7 @@ void PipelineExecutor::execute(size_t num_threads)
void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads)
{
#ifndef N_DEBUG
#ifndef NDEBUG
UInt64 total_time_ns = 0;
UInt64 execution_time_ns = 0;
UInt64 processing_time_ns = 0;
@ -577,13 +577,13 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
addJob(state);
{
#ifndef N_DEBUG
#ifndef NDEBUG
Stopwatch execution_time_watch;
#endif
state->job();
#ifndef N_DEBUG
#ifndef NDEBUG
execution_time_ns += execution_time_watch.elapsed();
#endif
}
@ -594,7 +594,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
if (finished)
break;
#ifndef N_DEBUG
#ifndef NDEBUG
Stopwatch processing_time_watch;
#endif
@ -648,13 +648,13 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
doExpandPipeline(task, false);
}
#ifndef N_DEBUG
#ifndef NDEBUG
processing_time_ns += processing_time_watch.elapsed();
#endif
}
}
#ifndef N_DEBUG
#ifndef NDEBUG
total_time_ns = total_time_watch.elapsed();
wait_time_ns = total_time_ns - execution_time_ns - processing_time_ns;
@ -769,7 +769,7 @@ String PipelineExecutor::dumpPipeline() const
WriteBufferFromOwnString buffer;
buffer << "(" << node.execution_state->num_executed_jobs << " jobs";
#ifndef N_DEBUG
#ifndef NDEBUG
buffer << ", execution time: " << node.execution_state->execution_time_ns / 1e9 << " sec.";
buffer << ", preparation time: " << node.execution_state->preparation_time_ns / 1e9 << " sec.";
#endif

View File

@ -264,6 +264,78 @@ const KeyCondition::AtomMap KeyCondition::atom_map
};
static const std::map<std::string, std::string> inverse_relations = {
{"equals", "notEquals"},
{"notEquals", "equals"},
{"less", "greaterOrEquals"},
{"greaterOrEquals", "less"},
{"greater", "lessOrEquals"},
{"lessOrEquals", "greater"},
{"in", "notIn"},
{"notIn", "in"},
{"like", "notLike"},
{"notLike", "like"},
{"empty", "notEmpty"},
{"notEmpty", "empty"},
};
bool isLogicalOperator(const String & func_name)
{
return (func_name == "and" || func_name == "or" || func_name == "not" || func_name == "indexHint");
}
/// The node can be one of:
/// - Logical operator (AND, OR, NOT and indexHint() - logical NOOP)
/// - An "atom" (relational operator, constant, expression)
/// - A logical constant expression
/// - Any other function
ASTPtr cloneASTWithInversionPushDown(const ASTPtr node, const bool need_inversion = false)
{
const ASTFunction * func = node->as<ASTFunction>();
if (func && isLogicalOperator(func->name))
{
if (func->name == "not")
{
return cloneASTWithInversionPushDown(func->arguments->children.front(), !need_inversion);
}
const auto result_node = makeASTFunction(func->name);
/// indexHint() is a special case - logical NOOP function
if (result_node->name != "indexHint" && need_inversion)
{
result_node->name = (result_node->name == "and") ? "or" : "and";
}
if (func->arguments)
{
for (const auto & child : func->arguments->children)
{
result_node->arguments->children.push_back(cloneASTWithInversionPushDown(child, need_inversion));
}
}
return result_node;
}
const auto cloned_node = node->clone();
if (func && inverse_relations.find(func->name) != inverse_relations.cend())
{
if (need_inversion)
{
cloned_node->as<ASTFunction>()->name = inverse_relations.at(func->name);
}
return cloned_node;
}
return need_inversion ? makeASTFunction("not", cloned_node) : cloned_node;
}
inline bool Range::equals(const Field & lhs, const Field & rhs) { return applyVisitor(FieldVisitorAccurateEquals(), lhs, rhs); }
inline bool Range::less(const Field & lhs, const Field & rhs) { return applyVisitor(FieldVisitorAccurateLess(), lhs, rhs); }
@ -345,21 +417,23 @@ KeyCondition::KeyCondition(
*/
Block block_with_constants = getBlockWithConstants(query_info.query, query_info.syntax_analyzer_result, context);
/// Trasform WHERE section to Reverse Polish notation
const auto & select = query_info.query->as<ASTSelectQuery &>();
if (select.where())
const ASTSelectQuery & select = query_info.query->as<ASTSelectQuery &>();
if (select.where() || select.prewhere())
{
traverseAST(select.where(), context, block_with_constants);
ASTPtr filter_query;
if (select.where() && select.prewhere())
filter_query = makeASTFunction("and", select.where(), select.prewhere());
else
filter_query = select.where() ? select.where() : select.prewhere();
if (select.prewhere())
{
traverseAST(select.prewhere(), context, block_with_constants);
rpn.emplace_back(RPNElement::FUNCTION_AND);
}
}
else if (select.prewhere())
{
traverseAST(select.prewhere(), context, block_with_constants);
/** When non-strictly monotonic functions are employed in functional index (e.g. ORDER BY toStartOfHour(dateTime)),
* the use of NOT operator in predicate will result in the indexing algorithm leave out some data.
* This is caused by rewriting in KeyCondition::tryParseAtomFromAST of relational operators to less strict
* when parsing the AST into internal RPN representation.
* To overcome the problem, before parsing the AST we transform it to its semantically equivalent form where all NOT's
* are pushed down and applied (when possible) to leaf nodes.
*/
traverseAST(cloneASTWithInversionPushDown(filter_query), context, block_with_constants);
}
else
{
@ -432,9 +506,9 @@ void KeyCondition::traverseAST(const ASTPtr & node, const Context & context, Blo
{
RPNElement element;
if (auto * func = node->as<ASTFunction>())
if (const auto * func = node->as<ASTFunction>())
{
if (operatorFromAST(func, element))
if (tryParseLogicalOperatorFromAST(func, element))
{
auto & args = func->arguments->children;
for (size_t i = 0, size = args.size(); i < size; ++i)
@ -452,7 +526,7 @@ void KeyCondition::traverseAST(const ASTPtr & node, const Context & context, Blo
}
}
if (!atomFromAST(node, context, block_with_constants, element))
if (!tryParseAtomFromAST(node, context, block_with_constants, element))
{
element.function = RPNElement::FUNCTION_UNKNOWN;
}
@ -680,7 +754,7 @@ static void castValueToType(const DataTypePtr & desired_type, Field & src_value,
}
bool KeyCondition::atomFromAST(const ASTPtr & node, const Context & context, Block & block_with_constants, RPNElement & out)
bool KeyCondition::tryParseAtomFromAST(const ASTPtr & node, const Context & context, Block & block_with_constants, RPNElement & out)
{
/** Functions < > = != <= >= in `notIn`, where one argument is a constant, and the other is one of columns of key,
* or itself, wrapped in a chain of possibly-monotonic functions,
@ -768,7 +842,9 @@ bool KeyCondition::atomFromAST(const ASTPtr & node, const Context & context, Blo
func_name = "lessOrEquals";
else if (func_name == "lessOrEquals")
func_name = "greaterOrEquals";
else if (func_name == "in" || func_name == "notIn" || func_name == "like")
else if (func_name == "in" || func_name == "notIn" ||
func_name == "like" || func_name == "notLike" ||
func_name == "startsWith")
{
/// "const IN data_column" doesn't make sense (unlike "data_column IN const")
return false;
@ -809,7 +885,7 @@ bool KeyCondition::atomFromAST(const ASTPtr & node, const Context & context, Blo
return false;
}
bool KeyCondition::operatorFromAST(const ASTFunction * func, RPNElement & out)
bool KeyCondition::tryParseLogicalOperatorFromAST(const ASTFunction * func, RPNElement & out)
{
/// Functions AND, OR, NOT.
/** Also a special function `indexHint` - works as if instead of calling a function there are just parentheses

View File

@ -369,8 +369,8 @@ private:
BoolMask initial_mask) const;
void traverseAST(const ASTPtr & node, const Context & context, Block & block_with_constants);
bool atomFromAST(const ASTPtr & node, const Context & context, Block & block_with_constants, RPNElement & out);
bool operatorFromAST(const ASTFunction * func, RPNElement & out);
bool tryParseAtomFromAST(const ASTPtr & node, const Context & context, Block & block_with_constants, RPNElement & out);
bool tryParseLogicalOperatorFromAST(const ASTFunction * func, RPNElement & out);
/** Is node the key column
* or expression in which column of key is wrapped by chain of functions,

View File

@ -5,6 +5,9 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadHelpers.h>
@ -18,7 +21,8 @@
#include <Columns/ColumnArray.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTLiteral.h>
#include "StorageLogSettings.h"
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
#define DBMS_STORAGE_LOG_MARKS_FILE_NAME "__marks.mrk"
@ -88,7 +92,7 @@ private:
plain->seek(offset, SEEK_SET);
}
std::unique_ptr<SeekableReadBuffer> plain;
std::unique_ptr<ReadBufferFromFileBase> plain;
CompressedReadBuffer compressed;
};
@ -540,8 +544,9 @@ void StorageLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLo
const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
{
const String & column_name = getColumns().begin()->name;
const IDataType & column_type = *getColumns().begin()->type;
/// There should be at least one physical column
const String column_name = getColumns().getAllPhysical().begin()->name;
const auto column_type = getColumns().getAllPhysical().begin()->type;
String filename;
/** We take marks from first column.
@ -549,7 +554,7 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
* (Example: for Array data type, first stream is array sizes; and number of array sizes is the number of arrays).
*/
IDataType::SubstreamPath substream_root_path;
column_type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
column_type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
if (filename.empty())
filename = IDataType::getFileNameForStream(column_name, substream_path);
@ -623,6 +628,10 @@ CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & /
void registerStorageLog(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{
.supports_settings = true
};
factory.registerStorage("Log", [](const StorageFactory::Arguments & args)
{
if (!args.engine_args.empty())
@ -630,10 +639,13 @@ void registerStorageLog(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String disk_name = getDiskName(*args.storage_def);
DiskPtr disk = args.context.getDisk(disk_name);
return StorageLog::create(
args.context.getDefaultDisk(), args.relative_data_path, args.table_id, args.columns, args.constraints,
disk, args.relative_data_path, args.table_id, args.columns, args.constraints,
args.context.getSettings().max_compress_block_size);
});
}, features);
}
}

View File

@ -0,0 +1,21 @@
#include "StorageLogSettings.h"
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
namespace DB
{
String getDiskName(ASTStorage & storage_def)
{
if (storage_def.settings)
{
SettingsChanges changes = storage_def.settings->changes;
for (auto it = changes.begin(); it != changes.end(); ++it)
{
if (it->name == "disk")
return it->value.safeGet<String>();
}
}
return "default";
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <Core/Types.h>
namespace DB
{
class ASTStorage;
String getDiskName(ASTStorage & storage_def);
}

View File

@ -8,7 +8,6 @@
#include <Common/escapeForFileName.h>
#include <Common/Exception.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadHelpers.h>
@ -26,8 +25,11 @@
#include <Interpreters/Context.h>
#include <Storages/StorageStripeLog.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageStripeLog.h>
#include "StorageLogSettings.h"
namespace DB
@ -120,7 +122,7 @@ private:
String data_file_path = storage.table_path + "data.bin";
size_t buffer_size = std::min(max_read_buffer_size, storage.disk->getFileSize(data_file_path));
data_in.emplace(fullPath(storage.disk, data_file_path), 0, 0, buffer_size);
data_in.emplace(storage.disk->readFile(data_file_path, buffer_size));
block_in.emplace(*data_in, 0, index_begin, index_end);
}
}
@ -253,7 +255,7 @@ BlockInputStreams StorageStripeLog::read(
if (!disk->exists(index_file))
return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) };
CompressedReadBufferFromFile index_in(fullPath(disk, index_file), 0, 0, 0, INDEX_BUFFER_SIZE);
CompressedReadBufferFromFile index_in(disk->readFile(index_file, INDEX_BUFFER_SIZE));
std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)};
BlockInputStreams res;
@ -305,6 +307,10 @@ void StorageStripeLog::truncate(const ASTPtr &, const Context &, TableStructureW
void registerStorageStripeLog(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{
.supports_settings = true
};
factory.registerStorage("StripeLog", [](const StorageFactory::Arguments & args)
{
if (!args.engine_args.empty())
@ -312,10 +318,13 @@ void registerStorageStripeLog(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String disk_name = getDiskName(*args.storage_def);
DiskPtr disk = args.context.getDisk(disk_name);
return StorageStripeLog::create(
args.context.getDefaultDisk(), args.relative_data_path, args.table_id, args.columns, args.constraints,
disk, args.relative_data_path, args.table_id, args.columns, args.constraints,
args.attach, args.context.getSettings().max_compress_block_size);
});
}, features);
}
}

View File

@ -10,6 +10,7 @@
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Compression/CompressionFactory.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
@ -25,9 +26,12 @@
#include <Interpreters/Context.h>
#include <Storages/StorageTinyLog.h>
#include <Storages/StorageFactory.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/CheckResults.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageTinyLog.h>
#include "StorageLogSettings.h"
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
@ -434,6 +438,10 @@ void StorageTinyLog::drop(TableStructureWriteLockHolder &)
void registerStorageTinyLog(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{
.supports_settings = true
};
factory.registerStorage("TinyLog", [](const StorageFactory::Arguments & args)
{
if (!args.engine_args.empty())
@ -441,10 +449,13 @@ void registerStorageTinyLog(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String disk_name = getDiskName(*args.storage_def);
DiskPtr disk = args.context.getDisk(disk_name);
return StorageTinyLog::create(
args.context.getDefaultDisk(), args.relative_data_path, args.table_id, args.columns, args.constraints,
disk, args.relative_data_path, args.table_id, args.columns, args.constraints,
args.attach, args.context.getSettings().max_compress_block_size);
});
}, features);
}
}

View File

@ -1,19 +0,0 @@
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
<storage_configuration>
<disks>
<default>
<type>memory</type>
</default>
</disks>
</storage_configuration>
</yandex>

View File

@ -1,30 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=['configs/config.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_tinylog(started_cluster):
node.query('''CREATE DATABASE IF NOT EXISTS test''')
node.query('''CREATE TABLE test.tinylog (s String, n UInt8) ENGINE = TinyLog''')
node.query('''INSERT INTO test.tinylog SELECT toString(number), number * 2 FROM system.numbers LIMIT 5''')
assert TSV(node.query('''SELECT * FROM test.tinylog''')) == TSV('0\t0\n1\t2\n2\t4\n3\t6\n4\t8')
node.query('''TRUNCATE TABLE test.tinylog''')
assert TSV(node.query('''SELECT * FROM test.tinylog''')) == TSV('')
node.query('''DROP TABLE test.tinylog''')

View File

@ -34,8 +34,10 @@ def cluster():
cluster.shutdown()
@pytest.mark.parametrize("log_engine,files_overhead", [("TinyLog", 1), ("Log", 2)])
def test_log_family_s3(cluster, log_engine, files_overhead):
@pytest.mark.parametrize(
"log_engine,files_overhead,files_overhead_per_insert",
[("TinyLog", 1, 1), ("Log", 2, 1), ("StripeLog", 1, 2)])
def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_insert):
node = cluster.instances["node"]
minio = cluster.minio_client
@ -43,15 +45,15 @@ def test_log_family_s3(cluster, log_engine, files_overhead):
node.query("INSERT INTO s3_test SELECT number FROM numbers(5)")
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 1 + files_overhead
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead_per_insert + files_overhead
node.query("INSERT INTO s3_test SELECT number + 5 FROM numbers(3)")
assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2 + files_overhead
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead_per_insert * 2 + files_overhead
node.query("INSERT INTO s3_test SELECT number + 8 FROM numbers(1)")
assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n8\n"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 3 + files_overhead
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead_per_insert * 3 + files_overhead
node.query("TRUNCATE TABLE s3_test")
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0

View File

@ -10,6 +10,8 @@ insert into test1_00863 (id, code) select number, toString(number) FROM numbers(
insert into test3_00863 (id, code) select number, toString(number) FROM numbers(100000);
insert into test2_00863 (id, code, test1_id, test3_id) select number, toString(number), number, number FROM numbers(100000);
SET max_memory_usage = 50000000;
select test2_00863.id
from test1_00863, test2_00863, test3_00863
where test1_00863.code in ('1', '2', '3')

View File

@ -0,0 +1,13 @@
drop table if exists ax;
drop table if exists bx;
create table ax (A Int64, B Int64) Engine = Memory;
create table bx (A Int64) Engine = Memory;
insert into ax values (1, 1), (2, 1);
insert into bx values (2), (4);
select * from bx, ax where ax.A = bx.A and ax.B in (1,2);
drop table ax;
drop table bx;

View File

@ -0,0 +1,3 @@
SELECT \n k, \n r.k, \n name\nFROM n\nALL INNER JOIN r ON k = r.k\nWHERE (k = r.k) AND (name = \'A\')
SELECT \n k, \n r.k, \n name\nFROM n\nALL INNER JOIN r ON k = r.k\nWHERE (k = r.k) AND (name LIKE \'A%\')
SELECT \n k, \n r.k, \n name\nFROM n\nALL INNER JOIN r ON k = r.k\nWHERE (k = r.k) AND (name NOT LIKE \'A%\')

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS n;
DROP TABLE IF EXISTS r;
CREATE TABLE n (k UInt32) ENGINE = Memory;
CREATE TABLE r (k UInt32, name String) ENGINE = Memory;
SET enable_debug_queries = 1;
SET enable_optimize_predicate_expression = 0;
ANALYZE SELECT * FROM n, r WHERE n.k = r.k AND r.name = 'A';
ANALYZE SELECT * FROM n, r WHERE n.k = r.k AND r.name LIKE 'A%';
ANALYZE SELECT * FROM n, r WHERE n.k = r.k AND r.name NOT LIKE 'A%';
DROP TABLE n;
DROP TABLE r;

View File

@ -0,0 +1,33 @@
TP1
7.51
7.42
7.41
7.42
7.41
7.42
7.41
7.42
7.41
7.51
TP2
7.42
7.41
7.42
7.51
7.42
7.41
7.51
7.51
TP3
7.42
7.41
7.51
TP4
7.42
7.41
7.42
7.42
7.41
TP5
7.41
7.51

View File

@ -0,0 +1,33 @@
SET max_threads = 1;
CREATE TABLE IF NOT EXISTS functional_index_mergetree (x Float64) ENGINE = MergeTree ORDER BY round(x);
INSERT INTO functional_index_mergetree VALUES (7.42)(7.41)(7.51);
SELECT 'TP1';
SELECT * FROM functional_index_mergetree WHERE x > 7.42;
SELECT * FROM functional_index_mergetree WHERE x < 7.49;
SELECT * FROM functional_index_mergetree WHERE x < 7.5;
SELECT * FROM functional_index_mergetree WHERE NOT (NOT x < 7.49);
SELECT * FROM functional_index_mergetree WHERE NOT (NOT x < 7.5);
SELECT * FROM functional_index_mergetree WHERE NOT (NOT x > 7.42);
SELECT 'TP2';
SELECT * FROM functional_index_mergetree WHERE NOT x > 7.49;
SELECT * FROM functional_index_mergetree WHERE NOT x < 7.42;
SELECT * FROM functional_index_mergetree WHERE NOT x < 7.41;
SELECT * FROM functional_index_mergetree WHERE NOT x < 7.5;
SELECT 'TP3';
SELECT * FROM functional_index_mergetree WHERE x > 7.41 AND x < 7.51;
SELECT * FROM functional_index_mergetree WHERE NOT (x > 7.41 AND x < 7.51);
SELECT 'TP4';
SELECT * FROM functional_index_mergetree WHERE NOT x < 7.41 AND NOT x > 7.49;
SELECT * FROM functional_index_mergetree WHERE NOT x < 7.42 AND NOT x > 7.42;
SELECT * FROM functional_index_mergetree WHERE (NOT x < 7.4) AND (NOT x > 7.49);
SELECT 'TP5';
SELECT * FROM functional_index_mergetree WHERE NOT or(NOT x, toUInt64(x) AND NOT floor(x) > 6, x >= 7.42 AND round(x) <= 7);
DROP TABLE functional_index_mergetree;

View File

@ -0,0 +1,18 @@
0
0
1
0
1
2
0
0
1
0
1
2
0
0
1
0
1
2

View File

@ -0,0 +1,40 @@
DROP TABLE IF EXISTS log;
CREATE TABLE log (x UInt8) ENGINE = StripeLog () SETTINGS disk = 'disk_memory';
SELECT * FROM log ORDER BY x;
INSERT INTO log VALUES (0);
SELECT * FROM log ORDER BY x;
INSERT INTO log VALUES (1);
SELECT * FROM log ORDER BY x;
INSERT INTO log VALUES (2);
SELECT * FROM log ORDER BY x;
TRUNCATE TABLE log;
DROP TABLE log;
CREATE TABLE log (x UInt8) ENGINE = TinyLog () SETTINGS disk = 'disk_memory';
SELECT * FROM log ORDER BY x;
INSERT INTO log VALUES (0);
SELECT * FROM log ORDER BY x;
INSERT INTO log VALUES (1);
SELECT * FROM log ORDER BY x;
INSERT INTO log VALUES (2);
SELECT * FROM log ORDER BY x;
TRUNCATE TABLE log;
DROP TABLE log;
CREATE TABLE log (x UInt8) ENGINE = Log () SETTINGS disk = 'disk_memory';
SELECT * FROM log ORDER BY x;
INSERT INTO log VALUES (0);
SELECT * FROM log ORDER BY x;
INSERT INTO log VALUES (1);
SELECT * FROM log ORDER BY x;
INSERT INTO log VALUES (2);
SELECT * FROM log ORDER BY x;
TRUNCATE TABLE log;
DROP TABLE log;

View File

@ -0,0 +1,7 @@
DROP TABLE IF EXISTS test_alias;
CREATE TABLE test_alias (a UInt8 ALIAS b, b UInt8) ENGINE Log;
SELECT count() FROM test_alias;
DROP TABLE test_alias;

View File

@ -27,13 +27,16 @@ function download
wget -nv -nd -c "https://clickhouse-builds.s3.yandex.net/$left_pr/$left_sha/performance/performance.tgz" -O- | tar -C left --strip-components=1 -zxv &
wget -nv -nd -c "https://clickhouse-builds.s3.yandex.net/$right_pr/$right_sha/performance/performance.tgz" -O- | tar -C right --strip-components=1 -zxv &
else
wget -nv -nd -c "https://clickhouse-builds.s3.yandex.net/$left_pr/$left_sha/performance/performance.tgz" -O- | tar -C left --strip-components=1 -zxv && cp -al left right
wget -nv -nd -c "https://clickhouse-builds.s3.yandex.net/$left_pr/$left_sha/performance/performance.tgz" -O- | tar -C left --strip-components=1 -zxv && cp -a left right &
fi
cd db0 && wget -nv -nd -c "https://s3.mds.yandex.net/clickhouse-private-datasets/hits_10m_single/partitions/hits_10m_single.tar" -O- | tar -xv &
cd db0 && wget -nv -nd -c "https://s3.mds.yandex.net/clickhouse-private-datasets/hits_100m_single/partitions/hits_100m_single.tar" -O- | tar -xv &
cd db0 && wget -nv -nd -c "https://clickhouse-datasets.s3.yandex.net/hits/partitions/hits_v1.tar" -O- | tar -xv &
cd db0 && wget -nv -nd -c "https://clickhouse-datasets.s3.yandex.net/values_with_expressions/partitions/test_values.tar" -O- | tar -xv &
mkdir ~/fg ; cd ~/fg && wget -nv -nd -c "https://raw.githubusercontent.com/brendangregg/FlameGraph/master/flamegraph.pl" && chmod +x ~/fg/flamegraph.pl &
wait
}
@ -223,13 +226,13 @@ function get_profiles
function report
{
for x in *.tsv
for x in {right,left}-{addresses,{query,trace}-log}.tsv
do
# FIXME This loop builds column definitons from TSVWithNamesAndTypes in an
# absolutely atrocious way. This should be done by the file() function itself.
paste -d' ' \
<(sed -n '1s/\t/\n/gp' "$x" | sed 's/\(^.*$\)/"\1"/') \
<(sed -n '2s/\t/\n/gp' "$x" ) \
<(sed -n '1{s/\t/\n/g;p;q}' "$x" | sed 's/\(^.*$\)/"\1"/') \
<(sed -n '2{s/\t/\n/g;p;q}' "$x" ) \
| tr '\n' ', ' | sed 's/,$//' > "$x.columns"
done
@ -301,43 +304,85 @@ create view right_query_log as select *
create view right_trace_log as select *
from file('right-trace-log.tsv', TSVWithNamesAndTypes, '$(cat right-trace-log.tsv.columns)');
create view right_addresses as select *
create view right_addresses_src as select *
from file('right-addresses.tsv', TSVWithNamesAndTypes, '$(cat right-addresses.tsv.columns)');
create table unstable_query_ids engine File(TSVWithNamesAndTypes, 'unstable-query-ids.rep') as
select query_id from right_query_log
create table right_addresses_join engine Join(any, left, address) as
select addr address, name from right_addresses_src;
create table unstable_query_runs engine File(TSVWithNamesAndTypes, 'unstable-query-runs.rep') as
select query_id, query from right_query_log
join unstable_queries_tsv using query
where query_id not like 'prewarm %'
;
create table unstable_query_metrics engine File(TSVWithNamesAndTypes, 'unstable-query-metrics.rep') as
create table unstable_query_log engine File(Vertical, 'unstable-query-log.rep') as
select * from right_query_log
where query_id in (select query_id from unstable_query_runs);
create table unstable_run_metrics engine File(TSVWithNamesAndTypes, 'unstable-run-metrics.rep') as
select ProfileEvents.Values value, ProfileEvents.Names metric, query_id, query
from right_query_log array join ProfileEvents
where query_id in (unstable_query_ids)
where query_id in (select query_id from unstable_query_runs)
;
create table unstable_query_traces engine File(TSVWithNamesAndTypes, 'unstable-query-traces.rep') as
select count() value, right_addresses.name metric,
unstable_query_ids.query_id, any(right_query_log.query) query
from unstable_query_ids
join right_query_log on right_query_log.query_id = unstable_query_ids.query_id
join right_trace_log on right_trace_log.query_id = unstable_query_ids.query_id
join right_addresses on addr = arrayJoin(trace)
group by unstable_query_ids.query_id, metric
create table unstable_run_metrics_2 engine File(TSVWithNamesAndTypes, 'unstable-run-metrics-2.rep') as
select v, n, query_id, query
from
(select
['memory_usage', 'read_bytes', 'written_bytes'] n,
[memory_usage, read_bytes, written_bytes] v,
query,
query_id
from right_query_log
where query_id in (select query_id from unstable_query_runs))
array join n, v;
create table unstable_run_traces engine File(TSVWithNamesAndTypes, 'unstable-run-traces.rep') as
select count() value, joinGet(right_addresses_join, 'name', arrayJoin(trace)) metric,
unstable_query_runs.query_id, any(unstable_query_runs.query) query
from unstable_query_runs
join right_trace_log on right_trace_log.query_id = unstable_query_runs.query_id
group by unstable_query_runs.query_id, metric
order by count() desc
;
create table metric_devation engine File(TSVWithNamesAndTypes, 'metric-deviation.rep') as
select floor((q[3] - q[1])/q[2], 3) d,
quantilesExact(0.05, 0.5, 0.95)(value) q, metric, query
from (select * from unstable_query_metrics
union all select * from unstable_query_traces)
quantilesExact(0, 0.5, 1)(value) q, metric, query
from (select * from unstable_run_metrics
union all select * from unstable_run_traces
union all select * from unstable_run_metrics_2)
join queries using query
group by query, metric
having d > 0.5
order by any(rd[3]) desc, d desc
;
create table stacks engine File(TSV, 'stacks.rep') as
select
query,
arrayStringConcat(
arrayMap(x -> joinGet(right_addresses_join, 'name', x),
arrayReverse(trace)
),
';'
) readable_trace,
count()
from right_trace_log
join unstable_query_runs using query_id
group by query, trace
;
"
IFS=$'\n'
for q in $(cut -d' ' -f1 stacks.rep | sort | uniq)
do
grep -F "$q" stacks.rep | cut -d' ' -f 2- | tee "$q.stacks.rep" | ~/fg/flamegraph.pl > "$q.svg" &
done
wait
unset IFS
# Remember that grep sets error code when nothing is found, hence the bayan
# operator
grep Exception:[^:] *-err.log > run-errors.log ||:

View File

@ -59,5 +59,5 @@ set +m
dmesg > dmesg.log
7z a /output/output.7z *.log *.tsv *.html *.txt *.rep
7z a /output/output.7z *.log *.tsv *.html *.txt *.rep *.svg
cp compare.log /output

View File

@ -116,7 +116,7 @@ for q in test_queries:
# Prewarm: run once on both servers. Helps to bring the data into memory,
# precompile the queries, etc.
for conn_index, c in enumerate(connections):
res = c.execute(q)
res = c.execute(q, query_id = 'prewarm {} {}'.format(0, q))
print('prewarm\t' + tsv_escape(q) + '\t' + str(conn_index) + '\t' + str(c.last_query.elapsed))
# Now, perform measured runs.

View File

@ -694,76 +694,73 @@ FORMAT Vertical
```text
Row 1:
──────
database: merge
table: visits
engine: ReplicatedCollapsingMergeTree
is_leader: 1
is_readonly: 0
is_session_expired: 0
future_parts: 1
parts_to_check: 0
zookeeper_path: /clickhouse/tables/01-06/visits
replica_name: example01-06-1.yandex.ru
replica_path: /clickhouse/tables/01-06/visits/replicas/example01-06-1.yandex.ru
columns_version: 9
queue_size: 1
inserts_in_queue: 0
merges_in_queue: 1
log_max_index: 596273
log_pointer: 596274
total_replicas: 2
active_replicas: 2
database: merge
table: visits
engine: ReplicatedCollapsingMergeTree
is_leader: 1
can_become_leader: 1
is_readonly: 0
is_session_expired: 0
future_parts: 1
parts_to_check: 0
zookeeper_path: /clickhouse/tables/01-06/visits
replica_name: example01-06-1.yandex.ru
replica_path: /clickhouse/tables/01-06/visits/replicas/example01-06-1.yandex.ru
columns_version: 9
queue_size: 1
inserts_in_queue: 0
merges_in_queue: 1
part_mutations_in_queue: 0
queue_oldest_time: 2020-02-20 08:34:30
inserts_oldest_time: 0000-00-00 00:00:00
merges_oldest_time: 2020-02-20 08:34:30
part_mutations_oldest_time: 0000-00-00 00:00:00
oldest_part_to_get:
oldest_part_to_merge_to: 20200220_20284_20840_7
oldest_part_to_mutate_to:
log_max_index: 596273
log_pointer: 596274
last_queue_update: 2020-02-20 08:34:32
absolute_delay: 0
total_replicas: 2
active_replicas: 2
```
Columns:
```text
database: Database name
table: Table name
engine: Table engine name
is_leader: Whether the replica is the leader.
Only one replica at a time can be the leader. The leader is responsible for selecting background merges to perform.
- `database` (`String`) - Database name
- `table` (`String`) - Table name
- `engine` (`String`) - Table engine name
- `is_leader` (`UInt8`) - Whether the replica is the leader.
Only one replica at a time can be the leader. The leader is responsible for selecting background merges to perform.
Note that writes can be performed to any replica that is available and has a session in ZK, regardless of whether it is a leader.
is_readonly: Whether the replica is in read-only mode.
- `can_become_leader` (`UInt8`) - Whether the replica can be elected as a leader.
- `is_readonly` (`UInt8`) - Whether the replica is in read-only mode.
This mode is turned on if the config doesn't have sections with ZooKeeper, if an unknown error occurred when reinitializing sessions in ZooKeeper, and during session reinitialization in ZooKeeper.
is_session_expired: Whether the session with ZooKeeper has expired.
Basically the same as 'is_readonly'.
future_parts: The number of data parts that will appear as the result of INSERTs or merges that haven't been done yet.
parts_to_check: The number of data parts in the queue for verification.
A part is put in the verification queue if there is suspicion that it might be damaged.
zookeeper_path: Path to table data in ZooKeeper.
replica_name: Replica name in ZooKeeper. Different replicas of the same table have different names.
replica_path: Path to replica data in ZooKeeper. The same as concatenating 'zookeeper_path/replicas/replica_path'.
columns_version: Version number of the table structure.
Indicates how many times ALTER was performed. If replicas have different versions, it means some replicas haven't made all of the ALTERs yet.
queue_size: Size of the queue for operations waiting to be performed.
Operations include inserting blocks of data, merges, and certain other actions.
It usually coincides with 'future_parts'.
inserts_in_queue: Number of inserts of blocks of data that need to be made.
Insertions are usually replicated fairly quickly. If this number is large, it means something is wrong.
merges_in_queue: The number of merges waiting to be made.
Sometimes merges are lengthy, so this value may be greater than zero for a long time.
- `is_session_expired` (`UInt8`) - the session with ZooKeeper has expired. Basically the same as `is_readonly`.
- `future_parts` (`UInt32`) - The number of data parts that will appear as the result of INSERTs or merges that haven't been done yet.
- `parts_to_check` (`UInt32`) - The number of data parts in the queue for verification. A part is put in the verification queue if there is suspicion that it might be damaged.
- `zookeeper_path` (`String`) - Path to table data in ZooKeeper.
- `replica_name` (`String`) - Replica name in ZooKeeper. Different replicas of the same table have different names.
- `replica_path` (`String`) - Path to replica data in ZooKeeper. The same as concatenating 'zookeeper_path/replicas/replica_path'.
- `columns_version` (`Int32`) - Version number of the table structure. Indicates how many times ALTER was performed. If replicas have different versions, it means some replicas haven't made all of the ALTERs yet.
- `queue_size` (`UInt32`) - Size of the queue for operations waiting to be performed. Operations include inserting blocks of data, merges, and certain other actions. It usually coincides with `future_parts`.
- `inserts_in_queue` (`UInt32`) - Number of inserts of blocks of data that need to be made. Insertions are usually replicated fairly quickly. If this number is large, it means something is wrong.
- `merges_in_queue` (`UInt32`) - The number of merges waiting to be made. Sometimes merges are lengthy, so this value may be greater than zero for a long time.
- `part_mutations_in_queue` (`UInt32`) - The number of mutations waiting to be made.
- `queue_oldest_time` (`DateTime`) - If `queue_size` greater than 0, shows when the oldest operation was added to the queue.
- `inserts_oldest_time` (`DateTime`) - See `queue_oldest_time`
- `merges_oldest_time` (`DateTime`) - See `queue_oldest_time`
- `part_mutations_oldest_time` (`DateTime`) - See `queue_oldest_time`
The next 4 columns have a non-zero value only where there is an active session with ZK.
log_max_index: Maximum entry number in the log of general activity.
log_pointer: Maximum entry number in the log of general activity that the replica copied to its execution queue, plus one.
If log_pointer is much smaller than log_max_index, something is wrong.
total_replicas: The total number of known replicas of this table.
active_replicas: The number of replicas of this table that have a session in ZooKeeper (i.e., the number of functioning replicas).
```
- `log_max_index` (`UInt64`) - Maximum entry number in the log of general activity.
- `log_pointer` (`UInt64`) - Maximum entry number in the log of general activity that the replica copied to its execution queue, plus one. If `log_pointer` is much smaller than `log_max_index`, something is wrong.
- `last_queue_update` (`DateTime`) - When the queue was updated last time.
- `absolute_delay` (`UInt64`) - How big lag in seconds the current replica has.
- `total_replicas` (`UInt8`) - The total number of known replicas of this table.
- `active_replicas` (`UInt8`) - The number of replicas of this table that have a session in ZooKeeper (i.e., the number of functioning replicas).
If you request all the columns, the table may work a bit slowly, since several reads from ZooKeeper are made for each row.
If you don't request the last 4 columns (log_max_index, log_pointer, total_replicas, active_replicas), the table works quickly.

View File

@ -109,12 +109,12 @@ When ClickHouse merges data parts, each group of consecutive rows with the same
For each resulting data part ClickHouse saves:
1. The first "cancel" and the last "state" rows, if the number of "state" and "cancel" rows matches.
2. The last "state" row, if there is one more "state" row than "cancel" rows.
3. The first "cancel" row, if there is one more "cancel" row than "state" rows.
1. The first "cancel" and the last "state" rows, if the number of "state" and "cancel" rows matches and the last row is a "state" row.
2. The last "state" row, if there is more "state" rows than "cancel" rows.
3. The first "cancel" row, if there is more "cancel" rows than "state" rows.
4. None of the rows, in all other cases.
The merge continues, but ClickHouse treats this situation as a logical error and records it in the server log. This error can occur if the same data were inserted more than once.
In addition when there is at least 2 more "state" rows than "cancel" rows, or at least 2 more "cancel" rows then "state" rows, the merge continues, but ClickHouse treats this situation as a logical error and records it in the server log. This error can occur if the same data were inserted more than once.
Thus, collapsing should not change the results of calculating statistics.
Changes gradually collapsed so that in the end only the last state of almost every object left.

View File

@ -275,8 +275,6 @@ Views look the same as normal tables. For example, they are listed in the result
There isn't a separate query for deleting views. To delete a view, use `DROP TABLE`.
[Original article](https://clickhouse.tech/docs/en/query_language/create/) <!--hide-->
## CREATE DICTIONARY {#create-dictionary-query}
```sql
@ -300,3 +298,5 @@ External dictionary structure consists of attributes. Dictionary attributes are
Depending on dictionary [layout](dicts/external_dicts_dict_layout.md) one or more attributes can be specified as dictionary keys.
For more information, see [External Dictionaries](dicts/external_dicts.md) section.
[Original article](https://clickhouse.tech/docs/en/query_language/create/) <!--hide-->

View File

@ -6,7 +6,7 @@ ClickHouse:
- Fully or partially stores dictionaries in RAM.
- Periodically updates dictionaries and dynamically loads missing values. In other words, dictionaries can be loaded dynamically.
- Allows to create external dictionaries with xml-files or [DDL queries](../create.md#create-dictionary-query).
- Allows to create external dictionaries with xml files or [DDL queries](../create.md#create-dictionary-query).
The configuration of external dictionaries can be located in one or more xml-files. The path to the configuration is specified in the [dictionaries_config](../../operations/server_settings/settings.md#server_settings-dictionaries_config) parameter.
@ -34,12 +34,16 @@ You can [configure](external_dicts_dict.md) any number of dictionaries in the sa
[DDL queries for dictionaries](../create.md#create-dictionary-query) doesn't require any additional records in server configuration. They allow to work with dictionaries as first-class entities, like tables or views.
!!! attention
!!! attention "Attention"
You can convert values for a small dictionary by describing it in a `SELECT` query (see the [transform](../functions/other_functions.md) function). This functionality is not related to external dictionaries.
**See also**
## See also {#ext-dicts-see-also}
- [Configuring an External Dictionary](external_dicts_dict.md)
- [Storing Dictionaries in Memory](external_dicts_dict_layout.md)
- [Dictionary Updates](external_dicts_dict_lifetime.md)
- [Sources of External Dictionaries](external_dicts_dict_sources.md)
- [Dictionary Key and Fields](external_dicts_dict_structure.md)
- [Functions for Working with External Dictionaries](../functions/ext_dict_functions.md)
[Original article](https://clickhouse.tech/docs/en/query_language/dicts/external_dicts/) <!--hide-->

View File

@ -1,6 +1,6 @@
# Configuring an External Dictionary {#dicts-external_dicts_dict}
If dictionary is configured using xml-file, than dictionary configuration has the following structure:
If dictionary is configured using xml file, than dictionary configuration has the following structure:
```xml
<dictionary>
@ -37,7 +37,7 @@ LAYOUT(...) -- Memory layout configuration
LIFETIME(...) -- Lifetime of dictionary in memory
```
- name The identifier that can be used to access the dictionary. Use the characters `[a-zA-Z0-9_\-]`.
- `name` The identifier that can be used to access the dictionary. Use the characters `[a-zA-Z0-9_\-]`.
- [source](external_dicts_dict_sources.md) — Source of the dictionary.
- [layout](external_dicts_dict_layout.md) — Dictionary layout in memory.
- [structure](external_dicts_dict_structure.md) — Structure of the dictionary . A key and attributes that can be retrieved by this key.

View File

@ -34,7 +34,7 @@ The configuration looks like this:
</yandex>
```
in case of [DDL-query](../create.md#create-dictionary-query), equal configuration will looks like
Corresponding [DDL-query](../create.md#create-dictionary-query):
```sql
CREATE DICTIONARY (...)

View File

@ -47,10 +47,14 @@ Attributes are described in the query body:
ClickHouse supports the following types of keys:
- Numeric key. UInt64. Defined in the `<id>` tag or using `PRIMARY KEY` keyword.
- Numeric key. `UInt64`. Defined in the `<id>` tag or using `PRIMARY KEY` keyword.
- Composite key. Set of values of different types. Defined in the tag `<key>` or `PRIMARY KEY` keyword.
A xml structure can contain either `<id>` or `<key>`. DDL-query must contain single `PRIMARY KEY`.
An xml structure can contain either `<id>` or `<key>`. DDL-query must contain single `PRIMARY KEY`.
!!! warning "Warning"
You must not describe key as an attribute.
### Numeric Key {#ext_dict-numeric-key}

View File

@ -81,7 +81,7 @@ SELECT name FROM system.dictionaries WHERE database = <db> [AND name LIKE <patte
**Example**
The following query selects the first two rows from the list of tables in the `system` database, whose names contain `co`.
The following query selects the first two rows from the list of tables in the `system` database, whose names contain `reg`.
```sql
SHOW DICTIONARIES FROM db LIKE '%reg%' LIMIT 2
@ -92,3 +92,5 @@ SHOW DICTIONARIES FROM db LIKE '%reg%' LIMIT 2
│ region_names │
└──────────────┘
```
[Original article](https://clickhouse.tech/docs/en/query_language/show/) <!--hide-->

View File

@ -741,76 +741,73 @@ FORMAT Vertical
```text
Row 1:
──────
database: merge
table: visits
engine: ReplicatedCollapsingMergeTree
is_leader: 1
is_readonly: 0
is_session_expired: 0
future_parts: 1
parts_to_check: 0
zookeeper_path: /clickhouse/tables/01-06/visits
replica_name: example01-06-1.yandex.ru
replica_path: /clickhouse/tables/01-06/visits/replicas/example01-06-1.yandex.ru
columns_version: 9
queue_size: 1
inserts_in_queue: 0
merges_in_queue: 1
log_max_index: 596273
log_pointer: 596274
total_replicas: 2
active_replicas: 2
database: merge
table: visits
engine: ReplicatedCollapsingMergeTree
is_leader: 1
can_become_leader: 1
is_readonly: 0
is_session_expired: 0
future_parts: 1
parts_to_check: 0
zookeeper_path: /clickhouse/tables/01-06/visits
replica_name: example01-06-1.yandex.ru
replica_path: /clickhouse/tables/01-06/visits/replicas/example01-06-1.yandex.ru
columns_version: 9
queue_size: 1
inserts_in_queue: 0
merges_in_queue: 1
part_mutations_in_queue: 0
queue_oldest_time: 2020-02-20 08:34:30
inserts_oldest_time: 0000-00-00 00:00:00
merges_oldest_time: 2020-02-20 08:34:30
part_mutations_oldest_time: 0000-00-00 00:00:00
oldest_part_to_get:
oldest_part_to_merge_to: 20200220_20284_20840_7
oldest_part_to_mutate_to:
log_max_index: 596273
log_pointer: 596274
last_queue_update: 2020-02-20 08:34:32
absolute_delay: 0
total_replicas: 2
active_replicas: 2
```
Столбцы:
```text
database: имя БД
table: имя таблицы
engine: имя движка таблицы
is_leader: является ли реплика лидером
В один момент времени, не более одной из реплик является лидером. Лидер отвечает за выбор фоновых слияний, которые следует произвести.
- `database` (`String`) - имя БД.
- `table` (`String`) - имя таблицы.
- `engine` (`String`) - имя движка таблицы.
- `is_leader` (`UInt8`) - является ли реплика лидером.
В один момент времени, не более одной из реплик является лидером. Лидер отвечает за выбор фоновых слияний, которые следует произвести.
Замечу, что запись можно осуществлять на любую реплику (доступную и имеющую сессию в ZK), независимо от лидерства.
is_readonly: находится ли реплика в режиме "только для чтения"
- `can_become_leader` (`UInt8`) - может ли реплика быть выбрана лидером.
- `is_readonly` (`UInt8`) - находится ли реплика в режиме "только для чтения"
Этот режим включается, если в конфиге нет секции с ZK; если при переинициализации сессии в ZK произошла неизвестная ошибка; во время переинициализации сессии с ZK.
is_session_expired: истекла ли сессия с ZK.
В основном, то же самое, что и is_readonly.
future_parts: количество кусков с данными, которые появятся в результате INSERT-ов или слияний, которых ещё предстоит сделать
parts_to_check: количество кусков с данными в очереди на проверку
Кусок помещается в очередь на проверку, если есть подозрение, что он может быть битым.
zookeeper_path: путь к данным таблицы в ZK
replica_name: имя реплики в ZK; разные реплики одной таблицы имеют разное имя
replica_path: путь к данным реплики в ZK. То же самое, что конкатенация zookeeper_path/replicas/replica_path.
columns_version: номер версии структуры таблицы
Обозначает, сколько раз был сделан ALTER. Если на репликах разные версии, значит некоторые реплики сделали ещё не все ALTER-ы.
queue_size: размер очереди действий, которых предстоит сделать
К действиям относятся вставки блоков данных, слияния, и некоторые другие действия.
Как правило, совпадает с future_parts.
inserts_in_queue: количество вставок блоков данных, которых предстоит сделать
Обычно вставки должны быстро реплицироваться. Если величина большая - значит что-то не так.
merges_in_queue: количество слияний, которых предстоит сделать
Бывают длинные слияния - то есть, это значение может быть больше нуля продолжительное время.
- `is_session_expired` (`UInt8`) - истекла ли сессия с ZK. В основном, то же самое, что и `is_readonly`.
- `future_parts` (`UInt32`) - количество кусков с данными, которые появятся в результате INSERT-ов или слияний, которых ещё предстоит сделать
- `parts_to_check` (`UInt32`) - количество кусков с данными в очереди на проверку. Кусок помещается в очередь на проверку, если есть подозрение, что он может быть битым.
- `zookeeper_path` (`String`) - путь к данным таблицы в ZK.
- `replica_name` (`String`) - имя реплики в ZK; разные реплики одной таблицы имеют разное имя.
- `replica_path` (`String`) - путь к данным реплики в ZK. То же самое, что конкатенация zookeeper_path/replicas/replica_path.
- `columns_version` (`Int32`) - номер версии структуры таблицы. Обозначает, сколько раз был сделан ALTER. Если на репликах разные версии, значит некоторые реплики сделали ещё не все ALTER-ы.
- `queue_size` (`UInt32`) - размер очереди действий, которые предстоит сделать. К действиям относятся вставки блоков данных, слияния, и некоторые другие действия. Как правило, совпадает с future_parts.
- `inserts_in_queue` (`UInt32`) - количество вставок блоков данных, которые предстоит сделать. Обычно вставки должны быстро реплицироваться. Если величина большая - значит что-то не так.
- `merges_in_queue` (`UInt32`) - количество слияний, которые предстоит сделать. Бывают длинные слияния - то есть, это значение может быть больше нуля продолжительное время.
- `part_mutations_in_queue` (`UInt32`) - количество мутаций, которые предстоит сделать.
- `queue_oldest_time` (`DateTime`) - если `queue_size` больше 0, показывает, когда была добавлена в очередь самая старая операция.
- `inserts_oldest_time` (`DateTime`) - см. `queue_oldest_time`.
- `merges_oldest_time` (`DateTime`) - см. `queue_oldest_time`.
- `part_mutations_oldest_time` (`DateTime`) - см. `queue_oldest_time`.
Следующие 4 столбца имеют ненулевое значение только если активна сессия с ZK.
log_max_index: максимальный номер записи в общем логе действий
log_pointer: максимальный номер записи из общего лога действий, которую реплика скопировала в свою очередь для выполнения, плюс единица
Если log_pointer сильно меньше log_max_index, значит что-то не так.
total_replicas: общее число известных реплик этой таблицы
active_replicas: число реплик этой таблицы, имеющих сессию в ZK; то есть, число работающих реплик
```
- `log_max_index` (`UInt64`) - максимальный номер записи в общем логе действий.
- `log_pointer` (`UInt64`) - максимальный номер записи из общего лога действий, которую реплика скопировала в свою очередь для выполнения, плюс единица. Если log_pointer сильно меньше log_max_index, значит что-то не так.
- `last_queue_update` (`DateTime`) - When the queue was updated last time.
- `absolute_delay` (`UInt64`) - How big lag in seconds the current replica has.
- `total_replicas` (`UInt8`) - общее число известных реплик этой таблицы.
- `active_replicas` (`UInt8`) - число реплик этой таблицы, имеющих сессию в ZK; то есть, число работающих реплик.
Если запрашивать все столбцы, то таблица может работать слегка медленно, так как на каждую строчку делается несколько чтений из ZK.
Если не запрашивать последние 4 столбца (log_max_index, log_pointer, total_replicas, active_replicas), то таблица работает быстро.

View File

@ -274,4 +274,28 @@ SELECT a, b, c FROM (SELECT ...)
Отсутствует отдельный запрос для удаления представлений. Чтобы удалить представление, следует использовать `DROP TABLE`.
## CREATE DICTIONARY {#create-dictionary-query}
```sql
CREATE DICTIONARY [IF NOT EXISTS] [db.]dictionary_name
(
key1 type1 [DEFAULT|EXPRESSION expr1] [HIERARCHICAL|INJECTIVE|IS_OBJECT_ID],
key2 type2 [DEFAULT|EXPRESSION expr2] [HIERARCHICAL|INJECTIVE|IS_OBJECT_ID],
attr1 type2 [DEFAULT|EXPRESSION expr3],
attr2 type2 [DEFAULT|EXPRESSION expr4]
)
PRIMARY KEY key1, key2
SOURCE(SOURCE_NAME([param1 value1 ... paramN valueN]))
LAYOUT(LAYOUT_NAME([param_name param_value]))
LIFETIME([MIN val1] MAX val2)
```
Создаёт [внешний словарь](dicts/external_dicts.md) с заданной [структурой](dicts/external_dicts_dict_structure.md), [источником](dicts/external_dicts_dict_sources.md), [способом размещения в памяти](dicts/external_dicts_dict_layout.md) и [периодом обновления](dicts/external_dicts_dict_lifetime.md).
Структура внешнего словаря состоит из атрибутов. Атрибуты словаря задаются как столбцы таблицы. Единственным обязательным свойством атрибута является его тип, все остальные свойства могут иметь значения по умолчанию.
В зависимости от [способа размещения словаря в памяти](dicts/external_dicts_dict_layout.md), ключами словаря могут быть один и более атрибутов.
Смотрите [Внешние словари](dicts/external_dicts.md).
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/create/) <!--hide-->

View File

@ -2,9 +2,12 @@
Существует возможность подключать собственные словари из различных источников данных. Источником данных для словаря может быть локальный текстовый/исполняемый файл, HTTP(s) ресурс или другая СУБД. Подробнее смотрите в разделе "[Источники внешних словарей](external_dicts_dict_sources.md)".
ClickHouse полностью или частично хранит словари в оперативной памяти. Словари можно подгружать динамически, ClickHouse периодически обновляет их и динамически подгружает отсутствующие значения.
ClickHouse:
- Полностью или частично хранит словари в оперативной памяти.
- Периодически обновляет их и динамически подгружает отсутствующие значения.
- Позволяет создавать внешние словари с помощью xml-файлов или [DDL-запросов](../create.md#create-dictionary-query).
Конфигурация внешних словарей находится в одном или нескольких файлах. Путь к конфигурации указывается в параметре [dictionaries_config](../../operations/server_settings/settings.md).
Конфигурация внешних словарей может находится в одном или нескольких xml-файлах. Путь к конфигурации указывается в параметре [dictionaries_config](../../operations/server_settings/settings.md).
Словари могут загружаться при старте сервера или при первом использовании, в зависимости от настройки [dictionaries_lazy_load](../../operations/server_settings/settings.md).
@ -30,12 +33,15 @@ ClickHouse полностью или частично хранит словар
</yandex>
```
В одном файле можно [сконфигурировать](external_dicts_dict.md) произвольное количество словарей. Формат файла сохраняется даже если словарь один (т.е. `<yandex><dictionary> <!--configuration--> </dictionary></yandex>`).
В одном файле можно [сконфигурировать](external_dicts_dict.md) произвольное количество словарей.
>можете преобразовывать значения по небольшому словарю, описав его в запросе `SELECT` (см. функцию [transform](../functions/other_functions.md)). Эта функциональность не связана с внешними словарями.
Если вы создаёте внешние словари [DDL-запросами](../create.md#create-dictionary-query), то не задавайте конфигурацию словаря в конфигурации сервера.
!!! attention "Внимание"
Можно преобразовывать значения по небольшому словарю, описав его в запросе `SELECT` (см. функцию [transform](../functions/other_functions.md)). Эта функциональность не связана с внешними словарями.
Смотрите также:
## Смотрите также {#ext-dicts-see-also}
- [Настройка внешнего словаря](external_dicts_dict.md)
- [Хранение словарей в памяти](external_dicts_dict_layout.md)

View File

@ -1,11 +1,15 @@
# Настройка внешнего словаря {#dicts-external_dicts_dict}
Конфигурация словаря имеет следующую структуру:
XML-конфигурация словаря имеет следующую структуру:
```xml
<dictionary>
<name>dict_name</name>
<structure>
<!-- Complex key configuration -->
</structure>
<source>
<!-- Source configuration -->
</source>
@ -14,20 +18,29 @@
<!-- Memory layout configuration -->
</layout>
<structure>
<!-- Complex key configuration -->
</structure>
<lifetime>
<!-- Lifetime of dictionary in memory -->
</lifetime>
</dictionary>
```
- name - Идентификатор, под которым словарь будет доступен для использования. Используйте символы `[a-zA-Z0-9_\-]`.
- [source](external_dicts_dict_sources.md) - Источник словаря.
- [layout](external_dicts_dict_layout.md) - Размещение словаря в памяти.
- [structure](external_dicts_dict_structure.md) - Структура словаря. Ключ и атрибуты, которые можно получить по ключу.
- [lifetime](external_dicts_dict_lifetime.md) - Периодичность обновления словарей.
Соответствующий [DDL-запрос](../create.md#create-dictionary-query) имеет следующий вид:
```sql
CREATE DICTIONARY dict_name
(
... -- attributes
)
PRIMARY KEY ... -- complex or single key configuration
SOURCE(...) -- Source configuration
LAYOUT(...) -- Memory layout configuration
LIFETIME(...) -- Lifetime of dictionary in memory
```
- `name` — Идентификатор, под которым словарь будет доступен для использования. Используйте символы `[a-zA-Z0-9_\-]`.
- [source](external_dicts_dict_sources.md) — Источник словаря.
- [layout](external_dicts_dict_layout.md) — Размещение словаря в памяти.
- [structure](external_dicts_dict_structure.md) — Структура словаря. Ключ и атрибуты, которые можно получить по ключу.
- [lifetime](external_dicts_dict_lifetime.md) — Периодичность обновления словарей.
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/dicts/external_dicts_dict/) <!--hide-->

View File

@ -35,15 +35,25 @@
```
Соответствущий [DDL-запрос](../create.md#create-dictionary-query):
```sql
CREATE DICTIONARY (...)
...
LAYOUT(LAYOUT_TYPE(param value)) -- layout settings
...
```
## Способы размещения словарей в памяти
- [flat](#flat)
- [hashed](#hashed)
- [cache](#cache)
- [range_hashed](#range-hashed)
- [complex_key_hashed](#complex-key-hashed)
- [complex_key_cache](#complex-key-cache)
- [ip_trie](#ip-trie)
- [flat](#flat)
- [hashed](#hashed)
- [sparse_hashed](#dicts-external_dicts_dict_layout-sparse_hashed)
- [cache](#cache)
- [range_hashed](#range-hashed)
- [complex_key_hashed](#complex-key-hashed)
- [complex_key_cache](#complex-key-cache)
- [ip_trie](#ip-trie)
### flat
@ -63,6 +73,12 @@
</layout>
```
или
```sql
LAYOUT(FLAT())
```
### hashed
Словарь полностью хранится в оперативной памяти в виде хэш-таблиц. Словарь может содержать произвольное количество элементов с произвольными идентификаторами. На практике, количество ключей может достигать десятков миллионов элементов.
@ -77,6 +93,29 @@
</layout>
```
или
```sql
LAYOUT(HASHED())
```
### sparse_hashed {#dicts-external_dicts_dict_layout-sparse_hashed}
Аналогичен `hashed`, но при этом занимает меньше места в памяти и генерирует более высокую загрузку CPU.
Пример конфигурации:
```xml
<layout>
<sparse_hashed />
</layout>
```
или
```sql
LAYOUT(SPARSE_HASHED())
```
### complex_key_hashed
@ -90,6 +129,12 @@
</layout>
```
или
```sql
LAYOUT(COMPLEX_KEY_HASHED())
```
### range_hashed
@ -131,6 +176,19 @@
...
```
или
```sql
CREATE DICTIONARY somedict (
id UInt64,
first Date,
last Date
)
PRIMARY KEY id
LAYOUT(RANGE_HASHED())
RANGE(MIN first MAX last)
```
Для работы с такими словарями в функцию `dictGetT` необходимо передавать дополнительный аргумент, для которого подбирается диапазон:
dictGetT('dict_name', 'attr_name', id, date)
@ -178,6 +236,18 @@
</yandex>
```
или
```sql
CREATE DICTIONARY somedict(
Abcdef UInt64,
StartTimeStamp UInt64,
EndTimeStamp UInt64,
XXXType String DEFAULT ''
)
PRIMARY KEY Abcdef
RANGE(MIN StartTimeStamp MAX EndTimeStamp)
```
### cache
@ -204,6 +274,12 @@
</layout>
```
или
```sql
LAYOUT(CACHE(SIZE_IN_CELLS 1000000000))
```
Укажите достаточно большой размер кэша. Количество ячеек следует подобрать экспериментальным путём:
1. Выставить некоторое значение.
@ -265,6 +341,17 @@
...
```
или
```sql
CREATE DICTIONARY somedict (
prefix String,
asn UInt32,
cca2 String DEFAULT '??'
)
PRIMARY KEY prefix
```
Этот ключ должен иметь только один атрибут типа `String`, содержащий допустимый префикс IP. Другие типы еще не поддерживаются.
Для запросов необходимо использовать те же функции (`dictGetT` с кортежем), что и для словарей с составными ключами:

View File

@ -14,6 +14,15 @@ ClickHouse периодически обновляет словари. Инте
...
</dictionary>
```
или
```sql
CREATE DICTIONARY (...)
...
LIFETIME(300)
...
```
Настройка `<lifetime>0</lifetime>` запрещает обновление словарей.
@ -32,6 +41,12 @@ ClickHouse периодически обновляет словари. Инте
</dictionary>
```
или
```sql
LIFETIME(MIN 300 MAX 360)
```
При обновлении словарей сервер ClickHouse применяет различную логику в зависимости от типа [источника](external_dicts_dict_sources.md):
> - У текстового файла проверяется время модификации. Если время изменилось по отношению к запомненному ранее, то словарь обновляется.
@ -56,4 +71,12 @@ ClickHouse периодически обновляет словари. Инте
</dictionary>
```
или
```sql
...
SOURCE(ODBC(... invalidate_query 'SELECT update_time FROM dictionary_source where id = 1'))
...
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/dicts/external_dicts_dict_lifetime/) <!--hide-->

View File

@ -3,7 +3,7 @@
Внешний словарь можно подключить из множества источников.
Общий вид конфигурации:
Общий вид XML-конфигурации:
```xml
<yandex>
@ -20,6 +20,16 @@
</yandex>
```
Аналогичный [DDL-запрос](../create.md#create-dictionary-query):
```sql
CREATE DICTIONARY dict_name (...)
...
SOURCE(SOURCE_TYPE(param1 val1 ... paramN valN)) -- Source configuration
...
```
Источник настраивается в разделе `source`.
Типы источников (`source_type`):
@ -48,10 +58,16 @@
</source>
```
или
```sql
SOURCE(FILE(path '/opt/dictionaries/os.tsv' format 'TabSeparated'))
```
Поля настройки:
- `path` - Абсолютный путь к файлу.
- `format` - Формат файла. Поддерживаются все форматы, описанные в разделе "[Форматы](../../interfaces/formats.md#formats)".
- `path` Абсолютный путь к файлу.
- `format` Формат файла. Поддерживаются все форматы, описанные в разделе "[Форматы](../../interfaces/formats.md#formats)".
## Исполняемый файл {#dicts-external_dicts_dict_sources-executable}
@ -69,10 +85,16 @@
</source>
```
или
```sql
SOURCE(EXECUTABLE(command 'cat /opt/dictionaries/os.tsv' format 'TabSeparated'))
```
Поля настройки:
- `command` - Абсолютный путь к исполняемому файлу или имя файла (если каталог программы прописан в `PATH`).
- `format` - Формат файла. Поддерживаются все форматы, описанные в разделе "[Форматы](../../interfaces/formats.md#formats)".
- `command` Абсолютный путь к исполняемому файлу или имя файла (если каталог программы прописан в `PATH`).
- `format` Формат файла. Поддерживаются все форматы, описанные в разделе "[Форматы](../../interfaces/formats.md#formats)".
## HTTP(s) {#dicts-external_dicts_dict_sources-http}
@ -86,16 +108,37 @@
<http>
<url>http://[::1]/os.tsv</url>
<format>TabSeparated</format>
<credentials>
<user>user</user>
<password>password</password>
</credentials>
<headers>
<header>
<name>API-KEY</name>
<value>key</value>
</header>
</headers>
</http>
</source>
```
или
```sql
SOURCE(HTTP(
url 'http://[::1]/os.tsv'
format 'TabSeparated'
credentials(user 'user' password 'password')
headers(header(name 'API-KEY' value 'key'))
))
```
Чтобы ClickHouse смог обратиться к HTTPS-ресурсу, необходимо [настроить openSSL](../../operations/server_settings/settings.md) в конфигурации сервера.
Поля настройки:
- `url` - URL источника.
- `format` - Формат файла. Поддерживаются все форматы, описанные в разделе "[Форматы](../../interfaces/formats.md#formats)".
- `url` URL источника.
- `format` Формат файла. Поддерживаются все форматы, описанные в разделе "[Форматы](../../interfaces/formats.md#formats)".
## ODBC {#dicts-external_dicts_dict_sources-odbc}
@ -105,20 +148,33 @@
Пример настройки:
```xml
<odbc>
<db>DatabaseName</db>
<table>ShemaName.TableName</table>
<connection_string>DSN=some_parameters</connection_string>
<invalidate_query>SQL_QUERY</invalidate_query>
</odbc>
<source>
<odbc>
<db>DatabaseName</db>
<table>ShemaName.TableName</table>
<connection_string>DSN=some_parameters</connection_string>
<invalidate_query>SQL_QUERY</invalidate_query>
</odbc>
</source>
```
или
```sql
SOURCE(ODBC(
db 'DatabaseName'
table 'SchemaName.TableName'
connection_string 'DSN=some_parameters'
invalidate_query 'SQL_QUERY'
))
```
Поля настройки:
- `db` - имя базы данных. Не указывать, если имя базы задано в параметрах. `<connection_string>`.
- `table` - имя таблицы и схемы, если она есть.
- `connection_string` - строка соединения.
- `invalidate_query` - запрос для проверки статуса словаря. Необязательный параметр. Читайте подробнее в разделе [Обновление словарей](external_dicts_dict_lifetime.md).
- `db` имя базы данных. Не указывать, если имя базы задано в параметрах. `<connection_string>`.
- `table` имя таблицы и схемы, если она есть.
- `connection_string` строка соединения.
- `invalidate_query` запрос для проверки статуса словаря. Необязательный параметр. Читайте подробнее в разделе [Обновление словарей](external_dicts_dict_lifetime.md).
ClickHouse получает от ODBC-драйвера информацию о квотировании и квотирует настройки в запросах к драйверу, поэтому имя таблицы нужно указывать в соответствии с регистром имени таблицы в базе данных.
@ -216,6 +272,18 @@ $ sudo apt-get install -y unixodbc odbcinst odbc-postgresql
</yandex>
```
или
```sql
CREATE DICTIONARY table_name (
id UInt64,
some_column UInt64 DEFAULT 0
)
PRIMARY KEY id
SOURCE(ODBC(connection_string 'DSN=myconnection' table 'postgresql_table'))
LAYOUT(HASHED())
LIFETIME(MIN 300 MAX 360)
Может понадобиться в `odbc.ini` указать полный путь до библиотеки с драйвером `DRIVER=/usr/local/lib/psqlodbcw.so`.
### Пример подключения MS SQL Server
@ -299,6 +367,20 @@ $ sudo apt-get install tdsodbc freetds-bin sqsh
</yandex>
```
или
```sql
CREATE DICTIONARY test (
k UInt64,
s String DEFAULT ''
)
PRIMARY KEY k
SOURCE(ODBC(table 'dict' connection_string 'DSN=MSSQL;UID=test;PWD=test'))
LAYOUT(FLAT())
LIFETIME(MIN 300 MAX 360)
```
## СУБД
@ -328,6 +410,22 @@ $ sudo apt-get install tdsodbc freetds-bin sqsh
</source>
```
или
```sql
SOURCE(MYSQL(
port 3306
user 'clickhouse'
password 'qwerty'
replica(host 'example01-1' priority 1)
replica(host 'example01-2' priority 1)
db 'db_name'
table 'table_name'
where 'id=10'
invalidate_query 'SQL_QUERY'
))
```
Поля настройки:
- `port` — порт сервера MySQL. Можно указать для всех реплик или для каждой в отдельности (внутри `<replica>`).
@ -362,6 +460,21 @@ MySQL можно подключить на локальном хосте чер
</source>
```
или
```sql
SOURCE(MYSQL(
host 'localhost'
socket '/path/to/socket/file.sock'
user 'clickhouse'
password 'qwerty'
db 'db_name'
table 'table_name'
where 'id=10'
invalidate_query 'SQL_QUERY'
))
```
### ClickHouse {#dicts-external_dicts_dict_sources-clickhouse}
@ -381,16 +494,30 @@ MySQL можно подключить на локальном хосте чер
</source>
```
или
```sql
SOURCE(CLICKHOUSE(
host 'example01-01-1'
port 9000
user 'default'
password ''
db 'default'
table 'ids'
where 'id=10'
))
```
Поля настройки:
- `host` - хост ClickHouse. Если host локальный, то запрос выполняется без сетевого взаимодействия. Чтобы повысить отказоустойчивость решения, можно создать таблицу типа [Distributed](../../operations/table_engines/distributed.md) и прописать её в дальнейших настройках.
- `port` - порт сервера ClickHouse.
- `user` - имя пользователя ClickHouse.
- `password` - пароль пользователя ClickHouse.
- `db` - имя базы данных.
- `table` - имя таблицы.
- `where` - условие выбора. Может отсутствовать.
- `invalidate_query` - запрос для проверки статуса словаря. Необязательный параметр. Читайте подробнее в разделе [Обновление словарей](external_dicts_dict_lifetime.md).
- `host` хост ClickHouse. Если host локальный, то запрос выполняется без сетевого взаимодействия. Чтобы повысить отказоустойчивость решения, можно создать таблицу типа [Distributed](../../operations/table_engines/distributed.md) и прописать её в дальнейших настройках.
- `port` порт сервера ClickHouse.
- `user` имя пользователя ClickHouse.
- `password` пароль пользователя ClickHouse.
- `db` имя базы данных.
- `table` имя таблицы.
- `where` условие выбора. Может отсутствовать.
- `invalidate_query` запрос для проверки статуса словаря. Необязательный параметр. Читайте подробнее в разделе [Обновление словарей](external_dicts_dict_lifetime.md).
### MongoDB {#dicts-external_dicts_dict_sources-mongodb}
@ -410,14 +537,27 @@ MySQL можно подключить на локальном хосте чер
</source>
```
или
```sql
SOURCE(MONGO(
host 'localhost'
port 27017
user ''
password ''
db 'test'
collection 'dictionary_source'
))
```
Поля настройки:
- `host` - хост MongoDB.
- `port` - порт сервера MongoDB.
- `user` - имя пользователя MongoDB.
- `password` - пароль пользователя MongoDB.
- `db` - имя базы данных.
- `collection` - имя коллекции.
- `host` хост MongoDB.
- `port` порт сервера MongoDB.
- `user` имя пользователя MongoDB.
- `password` пароль пользователя MongoDB.
- `db` имя базы данных.
- `collection` имя коллекции.
### Redis {#dicts-external_dicts_dict_sources-redis}
@ -434,6 +574,17 @@ MySQL можно подключить на локальном хосте чер
</source>
```
или
```sql
SOURCE(REDIS(
host 'localhost'
port 6379
storage_type 'simple'
db_index 0
))
```
Поля настройки:
- `host` хост Redis.

View File

@ -24,10 +24,10 @@
Атрибуты описываются элементами:
- `<id>` — [столбец с ключом](external_dicts_dict_structure.md#ext_dict_structure-key).
- `<attribute>` — [столбец данных](external_dicts_dict_structure.md#ext_dict_structure-attributes). Можно задать несколько столбцов.
- `<attribute>` — [столбец данных](external_dicts_dict_structure.md#ext_dict_structure-attributes). Можно задать несколько атрибутов.
Запрос создания словаря:
Создание словаря запросом:
```sql
CREATE DICTIONARY dict_name (
@ -48,10 +48,10 @@ PRIMARY KEY Id
ClickHouse поддерживает следующие виды ключей:
- Числовой ключ. `UInt64`. Описывается в теге `<id>`.
- Составной ключ. Набор значений разного типа. Описывается в теге `<key>`.
- Числовой ключ. `UInt64`. Описывается в теге `<id>` или ключевым словом `PRIMARY KEY`.
- Составной ключ. Набор значений разного типа. Описывается в теге `<key>` или ключевым словом `PRIMARY KEY`.
Структура может содержать либо `<id>` либо `<key>`.
Структура может содержать либо `<id>` либо `<key>`. DDL-запрос может содержать только `PRIMARY KEY`.
!!! warning "Обратите внимание"
Ключ не надо дополнительно описывать в атрибутах.
@ -72,6 +72,20 @@ ClickHouse поддерживает следующие виды ключей:
- `name` — имя столбца с ключами.
Для DDL-запроса:
```sql
CREATE DICTIONARY (
Id UInt64,
...
)
PRIMARY KEY Id
...
```
- `PRIMARY KEY` имя столбца с ключами.
### Составной ключ
Ключом может быть кортеж (`tuple`) из полей произвольных типов. В этом случае [layout](external_dicts_dict_layout.md) должен быть `complex_key_hashed` или `complex_key_cache`.
@ -97,6 +111,18 @@ ClickHouse поддерживает следующие виды ключей:
...
```
или
```sql
CREATE DICTIONARY (
field1 String,
field2 String
...
)
PRIMARY KEY field1, field2
...
```
При запросе в функции `dictGet*` в качестве ключа передаётся кортеж. Пример: `dictGetString('dict_name', 'attr_name', tuple('string for field1', num_for_field2))`.
@ -119,6 +145,15 @@ ClickHouse поддерживает следующие виды ключей:
</structure>
```
или
```sql
CREATE DICTIONARY somename (
Name ClickHouseDataType DEFAULT '' EXPRESSION rand64() HIERARCHICAL INJECTIVE IS_OBJECT_ID
)
```
Поля конфигурации:
| Тег | Описание | Обязательный |

View File

@ -3,10 +3,10 @@
## SHOW CREATE TABLE
```sql
SHOW CREATE [TEMPORARY] TABLE [db.]table [INTO OUTFILE filename] [FORMAT format]
SHOW CREATE [TEMPORARY] [TABLE|DICTIONARY] [db.]table [INTO OUTFILE filename] [FORMAT format]
```
Возвращает один столбец типа `String` с именем statement, содержащий одно значение — запрос `CREATE TABLE`, с помощью которого была создана указанная таблица.
Возвращает один столбец типа `String` с именем statement, содержащий одно значение — запрос `CREATE TABLE`, с помощью которого был создан указанный объект.
## SHOW DATABASES {#show-databases}
@ -62,3 +62,35 @@ SHOW TABLES FROM system LIKE '%co%' LIMIT 2
│ collations │
└────────────────────────────────┘
```
## SHOW DICTIONARIES
Выводит список [внешних словарей](dicts/external_dicts.md).
```sql
SHOW DICTIONARIES [FROM <db>] [LIKE '<pattern>'] [LIMIT <N>] [INTO OUTFILE <filename>] [FORMAT <format>]
```
Если секция `FROM` не указана, запрос возвращает список словарей из текущей базы данных.
Аналогичный результат можно получить следующим запросом:
```sql
SELECT name FROM system.dictionaries WHERE database = <db> [AND name LIKE <pattern>] [LIMIT <N>] [INTO OUTFILE <filename>] [FORMAT <format>]
```
**Example**
Запрос выводит первые две стоки из списка таблиц в базе данных `system`, имена которых содержат `reg`.
```sql
SHOW DICTIONARIES FROM db LIKE '%reg%' LIMIT 2
```
```text
┌─name─────────┐
│ regions │
│ region_names │
└──────────────┘
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/show/) <!--hide-->

View File

@ -2,35 +2,35 @@ alabaster==0.7.12
Babel==2.5.1
backports-abc==0.5
beautifulsoup4==4.8.2
certifi==2017.11.5
certifi==2019.11.28
chardet==3.0.4
click==6.7
CommonMark==0.5.4
CommonMark==0.9.1
cssmin==0.2.0
docutils==0.16
futures==3.1.1
htmlmin==0.1.12
idna==2.6
idna==2.9
imagesize==1.2.0
Jinja2==2.11.1
jsmin==2.2.2
livereload==2.5.1
Markdown==2.6.11
MarkupSafe==1.0
MarkupSafe==1.1.1
mkdocs==1.0.4
Pygments==2.5.2
python-slugify==1.2.6
pytz==2017.3
PyYAML==5.3
recommonmark==0.4.0
requests==2.21.0
requests==2.23.0
singledispatch==3.4.0.3
six==1.11.0
six==1.14.0
snowballstemmer==1.2.1
Sphinx==1.6.5
sphinxcontrib-websupport==1.0.1
tornado==5.1
typing==3.7.4.1
Unidecode==1.0.23
Unidecode==1.1.1
urllib3==1.25.8
gitpython==2.1.14

View File

@ -109,9 +109,9 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
对每个结果的数据部分 ClickHouse 保存:
1. 第一个“取消”和最后一个“状态”行,如果“状态”和“取消”行的数量匹配
2. 最后一个“状态”行,如果“状态”行比“取消”行多一个。
3. 第一个“取消”行,如果“取消”行比“状态”行多一个。
1. 第一个“取消”和最后一个“状态”行,如果“状态”和“取消”行的数量匹配和最后一个行是“状态”行
2. 最后一个“状态”行,如果“状态”行比“取消”行多一个或一个以上
3. 第一个“取消”行,如果“取消”行比“状态”行多一个或一个以上
4. 没有行,在其他所有情况下。
合并会继续,但是 ClickHouse 会把此情况视为逻辑错误并将其记录在服务日志中。这个错误会在相同的数据被插入超过一次时出现。

34
website/workers/events.js Normal file
View File

@ -0,0 +1,34 @@
addEventListener('fetch', event => {
event.respondWith(handleRequest(event.request))
})
async function handleRequest(request) {
let raw = await fetch('https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/README.md');
let text = await raw.text();
let lines = text.split('\n');
let skip = true;
let events = [];
for (let idx in lines) {
let line = lines[idx];
if (skip) {
if (line.includes('Upcoming Events')) {
skip = false;
}
} else {
if (!line) { continue; };
line = line.split('](');
var tail = line[1].split(') ');
events.push({
'signup_link': tail[0],
'event_name': line[0].replace('* [', ''),
'event_date': tail[1].slice(0, -1).replace('on ', '')
});
}
}
let response = new Response(JSON.stringify({
'events': events
}));
response.headers.set('Content-Type', 'application/json');
return response;
}

10
website/workers/repo.js Normal file
View File

@ -0,0 +1,10 @@
addEventListener('fetch', event => {
event.respondWith(handleRequest(event.request))
})
async function handleRequest(request) {
let url = new URL(request.url);
url.hostname = 'repo.yandex.ru';
url.pathname = '/clickhouse' + url.pathname;
return fetch(url)
}