Merge remote-tracking branch 'origin/master' into rocksdb_metacache

This commit is contained in:
taiyang-li 2022-01-13 18:29:38 +08:00
commit 964f92f2c1
107 changed files with 1757 additions and 1145 deletions

View File

@ -158,6 +158,8 @@ While inside the `build` directory, configure your build by running CMake. Befor
export CC=clang CXX=clang++
cmake ..
If you installed clang using the automatic installation script above, also specify the version of clang installed in the first command, e.g. `export CC=clang-13 CXX=clang++-13`. The clang version will be in the script output.
The `CC` variable specifies the compiler for C (short for C Compiler), and `CXX` variable instructs which C++ compiler is to be used for building.
For a faster build, you can resort to the `debug` build type - a build with no optimizations. For that supply the following parameter `-D CMAKE_BUILD_TYPE=Debug`:

View File

@ -66,4 +66,14 @@ SELECT COUNT() FROM mongo_table;
└─────────┘
```
You can also adjust connection timeout:
``` sql
CREATE TABLE mongo_table
(
key UInt64,
data String
) ENGINE = MongoDB('mongo2:27017', 'test', 'simple_table', 'testuser', 'clickhouse', 'connectTimeoutMS=100000');
```
[Original article](https://clickhouse.com/docs/en/engines/table-engines/integrations/mongodb/) <!--hide-->

View File

@ -463,12 +463,13 @@ void ClientBase::initBlockOutputStream(const Block & block, ASTPtr parsed_query)
/// The query can specify output format or output file.
if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get()))
{
String out_file;
if (query_with_output->out_file)
{
select_into_file = true;
const auto & out_file_node = query_with_output->out_file->as<ASTLiteral &>();
const auto & out_file = out_file_node.value.safeGet<std::string>();
out_file = out_file_node.value.safeGet<std::string>();
std::string compression_method;
if (query_with_output->compression)
@ -494,6 +495,12 @@ void ClientBase::initBlockOutputStream(const Block & block, ASTPtr parsed_query)
const auto & id = query_with_output->format->as<ASTIdentifier &>();
current_format = id.name();
}
else if (query_with_output->out_file)
{
const auto & format_name = FormatFactory::instance().getFormatFromFileName(out_file);
if (!format_name.empty())
current_format = format_name;
}
}
if (has_vertical_output_suffix)
@ -1008,11 +1015,15 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
compression_method = compression_method_node.value.safeGet<std::string>();
}
String current_format = parsed_insert_query->format;
if (current_format.empty())
current_format = FormatFactory::instance().getFormatFromFileName(in_file);
/// Create temporary storage file, to support globs and parallel reading
StorageFile::CommonArguments args{
WithContext(global_context),
parsed_insert_query->table_id,
parsed_insert_query->format,
current_format,
getFormatSettings(global_context),
compression_method,
columns_description_for_query,

View File

@ -214,6 +214,12 @@ bool LocalConnection::poll(size_t)
if (next_packet_type)
return true;
if (state->exception)
{
next_packet_type = Protocol::Server::Exception;
return true;
}
if (!state->is_finished)
{
if (send_progress && (state->after_send_progress.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))

View File

@ -323,7 +323,7 @@ private:
UInt64 address = 0;
};
static const UInt64 mask = 0xFFFFFFFFFFFFFFFC;
static const UInt32 medium_set_size_max = 1UL << medium_set_power2_max;
static const UInt32 medium_set_size_max = 1ULL << medium_set_power2_max;
};
}

View File

@ -280,7 +280,7 @@ public:
if ((reinterpret_cast<uintptr_t>(p) & 2048) == 0)
{
memcpy(&n[0], p, 8);
n[0] &= -1ul >> s;
n[0] &= -1ULL >> s;
}
else
{

View File

@ -114,7 +114,7 @@ public:
if ((reinterpret_cast<uintptr_t>(p) & 2048) == 0)
{
memcpy(&n[0], p, 8);
n[0] &= -1ul >> s;
n[0] &= -1ULL >> s;
}
else
{

View File

@ -23,7 +23,7 @@ static constexpr auto NS = 1000000000UL;
/// Tracking window. Actually the size is not really important. We just want to avoid
/// throttles when there are no actions for a long period time.
static const double window_ns = 1UL * NS;
static const double window_ns = 1ULL * NS;
void Throttler::add(size_t amount)
{

View File

@ -156,13 +156,15 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
StorageMySQLConfiguration configuration;
ASTs & arguments = engine->arguments->children;
MySQLSettings mysql_settings;
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true))
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, mysql_settings))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
mysql_settings.applyChanges(settings_changes);
if (!storage_specific_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -200,7 +202,6 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (engine_name == "MySQL")
{
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
MySQLSettings mysql_settings;
auto mysql_pool = createMySQLPoolWithFailover(configuration, mysql_settings);
mysql_database_settings->loadFromQueryContext(context);
@ -299,7 +300,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
@ -358,7 +359,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration);
if (!storage_specific_args.empty())

View File

@ -50,12 +50,17 @@ DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL(
, remote_database_name(postgres_database_name)
, connection_info(connection_info_)
, settings(std::move(settings_))
, startup_task(getContext()->getSchedulePool().createTask("MaterializedPostgreSQLDatabaseStartup", [this]{ startSynchronization(); }))
{
}
void DatabaseMaterializedPostgreSQL::startSynchronization()
{
std::lock_guard lock(handler_mutex);
if (shutdown_called)
return;
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
/* replication_identifier */database_name,
remote_database_name,
@ -104,24 +109,14 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
}
LOG_TRACE(log, "Loaded {} tables. Starting synchronization", materialized_tables.size());
replication_handler->startup();
replication_handler->startup(/* delayed */false);
}
void DatabaseMaterializedPostgreSQL::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
DatabaseAtomic::startupTables(thread_pool, force_restore, force_attach);
try
{
startSynchronization();
}
catch (...)
{
tryLogCurrentException(log, "Cannot load nested database objects for PostgreSQL database engine.");
if (!force_attach)
throw;
}
startup_task->activateAndSchedule();
}
@ -376,6 +371,7 @@ StoragePtr DatabaseMaterializedPostgreSQL::detachTable(ContextPtr context_, cons
void DatabaseMaterializedPostgreSQL::shutdown()
{
startup_task->deactivate();
stopReplication();
DatabaseAtomic::shutdown();
}
@ -387,6 +383,7 @@ void DatabaseMaterializedPostgreSQL::stopReplication()
if (replication_handler)
replication_handler->shutdown();
shutdown_called = true;
/// Clear wrappers over nested, all access is not done to nested tables directly.
materialized_tables.clear();
}

View File

@ -86,6 +86,9 @@ private:
std::map<std::string, StoragePtr> materialized_tables;
mutable std::mutex tables_mutex;
mutable std::mutex handler_mutex;
BackgroundSchedulePool::TaskHolder startup_task;
bool shutdown_called = false;
};
}

View File

@ -247,12 +247,13 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
if (named_collection)
{
host = named_collection->host;
user = named_collection->username;
password = named_collection->password;
db = named_collection->database;
table = named_collection->table;
port = named_collection->port;
const auto & configuration = named_collection->configuration;
host = configuration.host;
user = configuration.username;
password = configuration.password;
db = configuration.database;
table = configuration.table;
port = configuration.port;
}
ClickHouseDictionarySource::Configuration configuration{

View File

@ -28,7 +28,7 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
auto named_collection = getExternalDataSourceConfiguration(config, config_prefix, context, has_config_key);
if (named_collection)
{
configuration = *named_collection;
configuration = named_collection->configuration;
}
else
{

View File

@ -60,18 +60,24 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
auto settings_config_prefix = config_prefix + ".mysql";
std::shared_ptr<mysqlxx::PoolWithFailover> pool;
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key) || key.starts_with("replica"); };
MySQLSettings mysql_settings;
auto has_config_key = [&](const String & key)
{
return dictionary_allowed_keys.contains(key) || key.starts_with("replica") || mysql_settings.has(key);
};
StorageMySQLConfiguration configuration;
auto named_collection = created_from_ddl
? getExternalDataSourceConfiguration(config, settings_config_prefix, global_context, has_config_key)
? getExternalDataSourceConfiguration(config, settings_config_prefix, global_context, has_config_key, mysql_settings)
: std::nullopt;
if (named_collection)
{
configuration.set(*named_collection);
mysql_settings.applyChanges(named_collection->settings_changes);
configuration.set(named_collection->configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
MySQLSettings mysql_settings;
const auto & settings = global_context->getSettingsRef();
if (!mysql_settings.isChanged("connect_timeout"))
mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec;
if (!mysql_settings.isChanged("read_write_timeout"))
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;
pool = std::make_shared<mysqlxx::PoolWithFailover>(createMySQLPoolWithFailover(configuration, mysql_settings));
}

View File

@ -14,6 +14,8 @@
#include <Poco/URI.h>
#include <Common/Exception.h>
#include <boost/algorithm/string/case_conv.hpp>
namespace DB
{
@ -391,6 +393,30 @@ void FormatFactory::registerOutputFormat(const String & name, OutputCreator outp
target = std::move(output_creator);
}
void FormatFactory::registerFileExtension(const String & extension, const String & format_name)
{
file_extension_formats[extension] = format_name;
}
String FormatFactory::getFormatFromFileName(String file_name)
{
CompressionMethod compression_method = chooseCompressionMethod(file_name, "");
if (CompressionMethod::None != compression_method)
{
auto pos = file_name.find_last_of('.');
if (pos != String::npos)
file_name = file_name.substr(0, pos);
}
auto pos = file_name.find_last_of('.');
if (pos == String::npos)
return "";
String file_extension = file_name.substr(pos + 1, String::npos);
boost::algorithm::to_lower(file_extension);
return file_extension_formats[file_extension];
}
void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine)
{
auto & target = dict[name].file_segmentation_engine;

View File

@ -5,6 +5,7 @@
#include <Formats/FormatSettings.h>
#include <Interpreters/Context_fwd.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <base/types.h>
#include <Core/NamesAndTypes.h>
@ -108,6 +109,7 @@ private:
};
using FormatsDictionary = std::unordered_map<String, Creators>;
using FileExtensionFormats = std::unordered_map<String, String>;
public:
static FormatFactory & instance();
@ -169,6 +171,10 @@ public:
void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);
/// Register file extension for format
void registerFileExtension(const String & extension, const String & format_name);
String getFormatFromFileName(String file_name);
/// Register schema readers for format its name.
void registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator);
void registerExternalSchemaReader(const String & name, ExternalSchemaReaderCreator external_schema_reader_creator);
@ -192,6 +198,7 @@ public:
private:
FormatsDictionary dict;
FileExtensionFormats file_extension_formats;
const Creators & getCreators(const String & name) const;

View File

@ -196,6 +196,16 @@ void registerFormats()
registerTSKVSchemaReader(factory);
registerValuesSchemaReader(factory);
registerTemplateSchemaReader(factory);
factory.registerFileExtension("csv", "CSV");
factory.registerFileExtension("tsv", "TSV");
factory.registerFileExtension("parquet", "Parquet");
factory.registerFileExtension("orc", "ORC");
factory.registerFileExtension("native", "Native");
factory.registerFileExtension("json", "JSON");
factory.registerFileExtension("ndjson", "JSONEachRow");
factory.registerFileExtension("xml", "XML");
factory.registerFileExtension("avro", "Avro");
}
}

View File

@ -19,12 +19,13 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
}
namespace
{
class FunctionH3CellAreaM2 : public IFunction
class FunctionH3CellAreaM2 final : public IFunction
{
public:
static constexpr auto name = "h3CellAreaM2";
@ -52,6 +53,14 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * column = checkAndGetColumn<ColumnUInt64>(arguments[0].column.get());
if (!column)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal type {} of argument {} of function {}. Must be UInt64.",
arguments[0].type->getName(),
1,
getName());
const auto & data = column->getData();
auto dst = ColumnVector<Float64>::create();
@ -60,8 +69,8 @@ public:
for (size_t row = 0; row < input_rows_count; ++row)
{
const UInt64 resolution = data[row];
Float64 res = cellAreaM2(resolution);
const UInt64 index = data[row];
Float64 res = cellAreaM2(index);
dst_data[row] = res;
}

View File

@ -19,12 +19,13 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
}
namespace
{
class FunctionH3CellAreaRads2 : public IFunction
class FunctionH3CellAreaRads2 final : public IFunction
{
public:
static constexpr auto name = "h3CellAreaRads2";
@ -52,6 +53,14 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * column = checkAndGetColumn<ColumnUInt64>(arguments[0].column.get());
if (!column)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal type {} of argument {} of function {}. Must be UInt64",
arguments[0].type->getName(),
1,
getName());
const auto & data = column->getData();
auto dst = ColumnVector<Float64>::create();
@ -60,8 +69,8 @@ public:
for (size_t row = 0; row < input_rows_count; ++row)
{
const UInt64 resolution = data[row];
Float64 res = cellAreaRads2(resolution);
const UInt64 index = data[row];
Float64 res = cellAreaRads2(index);
dst_data[row] = res;
}

View File

@ -16,12 +16,13 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
}
namespace
{
class FunctionH3DegsToRads : public IFunction
class FunctionH3DegsToRads final : public IFunction
{
public:
static constexpr auto name = "h3DegsToRads";
@ -51,6 +52,15 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * column = checkAndGetColumn<ColumnFloat64>(arguments[0].column.get());
if (!column)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal type {} of argument {} of function {}. Must be Float64",
arguments[0].type->getName(),
1,
getName());
const auto & data = column->getData();
auto dst = ColumnVector<Float64>::create();

View File

@ -20,12 +20,13 @@ namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int ILLEGAL_COLUMN;
}
namespace
{
class FunctionH3HexAreaKm2 : public IFunction
class FunctionH3HexAreaKm2 final : public IFunction
{
public:
static constexpr auto name = "h3HexAreaKm2";
@ -53,6 +54,14 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * column = checkAndGetColumn<ColumnUInt8>(arguments[0].column.get());
if (!column)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal type {} of argument {} of function {}. Must be UInt8",
arguments[0].column->getName(),
1,
getName());
const auto & data = column->getData();
auto dst = ColumnVector<Float64>::create();

View File

@ -16,12 +16,13 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
}
namespace
{
class FunctionH3RadsToDegs : public IFunction
class FunctionH3RadsToDegs final : public IFunction
{
public:
static constexpr auto name = "h3RadsToDegs";
@ -51,6 +52,14 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * column = checkAndGetColumn<ColumnFloat64>(arguments[0].column.get());
if (!column)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal type {} of argument {} of function {}. Must be Float64",
arguments[0].type->getName(),
1,
getName());
const auto & col_rads = column->getData();
auto dst = ColumnVector<Float64>::create();

View File

@ -34,6 +34,8 @@ public:
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override;
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override;

View File

@ -1977,6 +1977,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
quota = context->getQuota();
query_info.settings_limit_offset_done = options.settings_limit_offset_done;
storage->read(query_plan, required_columns, metadata_snapshot, query_info, context, processing_stage, max_block_size, max_streams);
if (context->hasQueryContext() && !options.is_internal)

View File

@ -83,7 +83,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
}
}
if (num_children == 1 && settings_limit_offset_needed)
if (num_children == 1 && settings_limit_offset_needed && !options.settings_limit_offset_done)
{
const ASTPtr first_select_ast = ast->list_of_selects->children.at(0);
ASTSelectQuery * select_query = dynamic_cast<ASTSelectQuery *>(first_select_ast.get());
@ -127,7 +127,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, std::move(new_limit_length_ast));
}
settings_limit_offset_done = true;
options.settings_limit_offset_done = true;
}
}
@ -305,7 +305,7 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
}
}
if (settings_limit_offset_needed && !settings_limit_offset_done)
if (settings_limit_offset_needed && !options.settings_limit_offset_done)
{
if (settings.limit > 0)
{

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTKillQueryQuery.h>
#include <Parsers/IAST.h>
#include <Parsers/queryNormalization.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Common/typeid_cast.h>
@ -76,6 +77,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
{
std::unique_lock lock(mutex);
IAST::QueryKind query_kind = ast->getQueryKind();
const auto queue_max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
if (!is_unlimited_query && max_size && processes.size() >= max_size)
@ -86,15 +88,14 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
}
String query_kind{ast->getQueryKindString()};
if (!is_unlimited_query)
{
auto amount = getQueryKindAmount(query_kind);
if (max_insert_queries_amount && query_kind == "Insert" && amount >= max_insert_queries_amount)
QueryAmount amount = getQueryKindAmount(query_kind);
if (max_insert_queries_amount && query_kind == IAST::QueryKind::Insert && amount >= max_insert_queries_amount)
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
"Too many simultaneous insert queries. Maximum: {}, current: {}",
max_insert_queries_amount, amount);
if (max_select_queries_amount && query_kind == "Select" && amount >= max_select_queries_amount)
if (max_select_queries_amount && query_kind == IAST::QueryKind::Select && amount >= max_select_queries_amount)
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
"Too many simultaneous select queries. Maximum: {}, current: {}",
max_select_queries_amount, amount);
@ -258,7 +259,7 @@ ProcessListEntry::~ProcessListEntry()
String user = it->getClientInfo().current_user;
String query_id = it->getClientInfo().current_query_id;
String query_kind = it->query_kind;
IAST::QueryKind query_kind = it->query_kind;
const QueryStatus * process_list_element_ptr = &*it;
@ -306,7 +307,7 @@ ProcessListEntry::~ProcessListEntry()
QueryStatus::QueryStatus(
ContextPtr context_, const String & query_, const ClientInfo & client_info_, QueryPriorities::Handle && priority_handle_, const String & query_kind_)
ContextPtr context_, const String & query_, const ClientInfo & client_info_, QueryPriorities::Handle && priority_handle_, IAST::QueryKind query_kind_)
: WithContext(context_)
, query(query_)
, client_info(client_info_)
@ -505,7 +506,7 @@ ProcessList::UserInfo ProcessList::getUserInfo(bool get_profile_events) const
return per_user_infos;
}
void ProcessList::increaseQueryKindAmount(const String & query_kind)
void ProcessList::increaseQueryKindAmount(const IAST::QueryKind & query_kind)
{
auto found = query_kind_amounts.find(query_kind);
if (found == query_kind_amounts.end())
@ -514,7 +515,7 @@ void ProcessList::increaseQueryKindAmount(const String & query_kind)
found->second += 1;
}
void ProcessList::decreaseQueryKindAmount(const String & query_kind)
void ProcessList::decreaseQueryKindAmount(const IAST::QueryKind & query_kind)
{
auto found = query_kind_amounts.find(query_kind);
/// TODO: we could just rebuild the map, as we have saved all query_kind.
@ -524,9 +525,9 @@ void ProcessList::decreaseQueryKindAmount(const String & query_kind)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong query kind amount: decrease to negative on '{}'", query_kind, found->second);
else
found->second -= 1;
}
ProcessList::QueryAmount ProcessList::getQueryKindAmount(const String & query_kind)
ProcessList::QueryAmount ProcessList::getQueryKindAmount(const IAST::QueryKind & query_kind) const
{
auto found = query_kind_amounts.find(query_kind);
if (found == query_kind_amounts.end())

View File

@ -9,6 +9,7 @@
#include <QueryPipeline/ExecutionSpeedLimits.h>
#include <Storages/IStorage_fwd.h>
#include <Poco/Condition.h>
#include <Parsers/IAST.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h>
@ -118,7 +119,7 @@ protected:
ProcessListForUser * user_process_list = nullptr;
String query_kind;
IAST::QueryKind query_kind;
public:
@ -127,7 +128,7 @@ public:
const String & query_,
const ClientInfo & client_info_,
QueryPriorities::Handle && priority_handle_,
const String & query_kind_
IAST::QueryKind query_kind_
);
~QueryStatus();
@ -270,7 +271,7 @@ public:
/// User -> queries
using UserToQueries = std::unordered_map<String, ProcessListForUser>;
using QueryKindToAmount = std::unordered_map<String, QueryAmount>;
using QueryKindAmounts = std::unordered_map<IAST::QueryKind, QueryAmount>;
protected:
friend class ProcessListEntry;
@ -301,11 +302,11 @@ protected:
size_t max_select_queries_amount = 0;
/// amount of queries by query kind.
QueryKindToAmount query_kind_amounts;
QueryKindAmounts query_kind_amounts;
void increaseQueryKindAmount(const String & query_kind);
void decreaseQueryKindAmount(const String & query_kind);
QueryAmount getQueryKindAmount(const String & query_kind);
void increaseQueryKindAmount(const IAST::QueryKind & query_kind);
void decreaseQueryKindAmount(const IAST::QueryKind & query_kind);
QueryAmount getQueryKindAmount(const IAST::QueryKind & query_kind) const;
public:
using EntryPtr = std::shared_ptr<ProcessListEntry>;

View File

@ -48,6 +48,7 @@ struct SelectQueryOptions
bool is_internal = false;
bool is_subquery = false; // non-subquery can also have subquery_depth > 0, e.g. insert select
bool with_all_cols = false; /// asterisk include materialized and aliased columns
bool settings_limit_offset_done = false;
/// These two fields are used to evaluate shardNum() and shardCount() function when
/// prefer_localhost_replica == 1 and local instance is selected. They are needed because local
@ -58,8 +59,10 @@ struct SelectQueryOptions
SelectQueryOptions(
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete,
size_t depth = 0,
bool is_subquery_ = false)
: to_stage(stage), subquery_depth(depth), is_subquery(is_subquery_)
bool is_subquery_ = false,
bool settings_limit_offset_done_ = false)
: to_stage(stage), subquery_depth(depth), is_subquery(is_subquery_),
settings_limit_offset_done(settings_limit_offset_done_)
{}
SelectQueryOptions copy() const { return *this; }

View File

@ -168,6 +168,8 @@ public:
void shutdown() override
{
stopFlushThread();
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
table->flushAndShutdown();
}
@ -186,7 +188,6 @@ private:
/* Saving thread data */
const StorageID table_id;
const String storage_def;
StoragePtr table;
String create_query;
String old_create_query;
bool is_prepared = false;
@ -525,7 +526,7 @@ void SystemLog<LogElement>::prepareTable()
{
String description = table_id.getNameForLogs();
table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
{

View File

@ -57,6 +57,8 @@
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Sources/WaitForAsyncInsertSource.h>
#include <base/EnumReflection.h>
#include <random>
@ -271,7 +273,7 @@ static void onExceptionBeforeStart(const String & query_for_logging, ContextPtr
// Try log query_kind if ast is valid
if (ast)
{
elem.query_kind = ast->getQueryKindString();
elem.query_kind = magic_enum::enum_name(ast->getQueryKind());
if (settings.log_formatted_queries)
elem.formatted_query = queryToString(ast);
}

View File

@ -248,7 +248,7 @@ public:
return removeOnCluster<ASTAlterQuery>(clone(), new_database);
}
const char * getQueryKindString() const override { return "Alter"; }
virtual QueryKind getQueryKind() const override { return QueryKind::Alter; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -119,7 +119,7 @@ public:
bool isView() const { return is_ordinary_view || is_materialized_view || is_live_view || is_window_view; }
const char * getQueryKindString() const override { return "Create"; }
virtual QueryKind getQueryKind() const override { return QueryKind::Create; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -45,7 +45,7 @@ public:
return removeOnCluster<ASTDropQuery>(clone(), new_database);
}
const char * getQueryKindString() const override { return "Drop"; }
virtual QueryKind getQueryKind() const override { return QueryKind::Drop; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override;

View File

@ -79,6 +79,13 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
settings.ostr << ")";
}
if (infile)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM INFILE " << (settings.hilite ? hilite_none : "") << infile->as<ASTLiteral &>().value.safeGet<std::string>();
if (compression)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " COMPRESSION " << (settings.hilite ? hilite_none : "") << compression->as<ASTLiteral &>().value.safeGet<std::string>();
}
if (select)
{
settings.ostr << " ";
@ -91,12 +98,6 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
}
else
{
if (infile)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM INFILE " << (settings.hilite ? hilite_none : "") << infile->as<ASTLiteral &>().value.safeGet<std::string>();
if (compression)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " COMPRESSION " << (settings.hilite ? hilite_none : "") << compression->as<ASTLiteral &>().value.safeGet<std::string>();
}
if (!format.empty())
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FORMAT " << (settings.hilite ? hilite_none : "") << format;

View File

@ -66,7 +66,7 @@ public:
return res;
}
const char * getQueryKindString() const override { return "Insert"; }
virtual QueryKind getQueryKind() const override { return QueryKind::Insert; }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -65,7 +65,7 @@ public:
return query_ptr;
}
const char * getQueryKindString() const override { return "Rename"; }
virtual QueryKind getQueryKind() const override { return QueryKind::Rename; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override

View File

@ -22,7 +22,7 @@ public:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
const char * getQueryKindString() const override { return "SelectIntersectExcept"; }
virtual QueryKind getQueryKind() const override { return QueryKind::SelectIntersectExcept; }
ASTs getListOfSelects() const;

View File

@ -135,7 +135,7 @@ public:
void setFinal();
const char * getQueryKindString() const override { return "Select"; }
virtual QueryKind getQueryKind() const override { return QueryKind::Select; }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -17,7 +17,7 @@ public:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
const char * getQueryKindString() const override { return "Select"; }
virtual QueryKind getQueryKind() const override { return QueryKind::Select; }
SelectUnionMode union_mode;

View File

@ -107,7 +107,7 @@ public:
return removeOnCluster<ASTSystemQuery>(clone(), new_database);
}
const char * getQueryKindString() const override { return "System"; }
virtual QueryKind getQueryKind() const override { return QueryKind::System; }
protected:

View File

@ -34,6 +34,6 @@ public:
void replaceEmptyDatabase(const String & current_database);
void replaceCurrentUserTag(const String & current_user_name) const;
ASTPtr getRewrittenASTWithoutOnCluster(const std::string &) const override { return removeOnCluster<ASTGrantQuery>(clone()); }
const char * getQueryKindString() const override { return is_revoke ? "Revoke" : "Grant"; }
virtual QueryKind getQueryKind() const override { return is_revoke ? QueryKind::Revoke : QueryKind::Grant; }
};
}

View File

@ -245,10 +245,23 @@ public:
void cloneChildren();
// Return query_kind string representation of this AST query.
virtual const char * getQueryKindString() const { return ""; }
enum class QueryKind : uint8_t
{
None = 0,
Alter,
Create,
Drop,
Grant,
Insert,
Rename,
Revoke,
SelectIntersectExcept,
Select,
System,
};
/// Return QueryKind of this AST query.
virtual QueryKind getQueryKind() const { return QueryKind::None; }
public:
/// For syntax highlighting.
static const char * hilite_keyword;
static const char * hilite_identifier;

View File

@ -116,7 +116,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
/// Check if file is a source of data.
if (s_from_infile.ignore(pos, expected))
{
/// Read its name to process it later
/// Read file name to process it later
if (!infile_name_p.parse(pos, infile, expected))
return false;
@ -133,7 +133,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
Pos before_values = pos;
String format_str;
/// VALUES or FROM INFILE or FORMAT or SELECT
/// VALUES or FORMAT or SELECT or WITH or WATCH.
/// After FROM INFILE we expect FORMAT, SELECT, WITH or nothing.
if (!infile && s_values.ignore(pos, expected))
{
/// If VALUES is defined in query, everything except setting will be parsed as data
@ -162,21 +163,17 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
tryGetIdentifierNameInto(format, format_str);
}
else if (s_watch.ignore(pos, expected))
else if (!infile && s_watch.ignore(pos, expected))
{
/// If WATCH is defined, return to position before WATCH and parse
/// rest of query as WATCH query.
pos = before_values;
ParserWatchQuery watch_p;
watch_p.parse(pos, watch, expected);
/// FORMAT section is expected if we have input() in SELECT part
if (s_format.ignore(pos, expected) && !name_p.parse(pos, format, expected))
return false;
}
else
else if (!infile)
{
/// If all previous conditions were false, query is incorrect
/// If all previous conditions were false and it's not FROM INFILE, query is incorrect
return false;
}

View File

@ -80,6 +80,9 @@ static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::Ch
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->chunk(chunk_i);
if (chunk->length() == 0)
continue;
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
@ -146,6 +149,9 @@ static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::Ch
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::BooleanArray & chunk = dynamic_cast<arrow::BooleanArray &>(*(arrow_column->chunk(chunk_i)));
if (chunk.length() == 0)
continue;
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];

View File

@ -139,8 +139,10 @@ void TTLTransform::finalize()
if (delete_algorithm)
{
size_t rows_removed = all_data_dropped ? data_part->rows_count : delete_algorithm->getNumberOfRemovedRows();
LOG_DEBUG(log, "Removed {} rows with expired TTL from part {}", rows_removed, data_part->name);
if (all_data_dropped)
LOG_DEBUG(log, "Removed all rows from part {} due to expired TTL", data_part->name);
else
LOG_DEBUG(log, "Removed {} rows with expired TTL from part {}", delete_algorithm->getNumberOfRemovedRows(), data_part->name);
}
}

View File

@ -15,6 +15,9 @@
#if USE_RDKAFKA
#include <Storages/Kafka/KafkaSettings.h>
#endif
#if USE_MYSQL
#include <Storages/MySQL/MySQLSettings.h>
#endif
#include <re2/re2.h>
@ -26,12 +29,31 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db",
"database", "table", "schema", "replica",
"update_field", "update_tag", "invalidate_query", "query",
"where", "name", "secure", "uri", "collection"};
template<typename T>
SettingsChanges getSettingsChangesFromConfig(
const BaseSettings<T> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
SettingsChanges config_settings;
for (const auto & setting : settings.all())
{
const auto & setting_name = setting.getName();
auto setting_value = config.getString(config_prefix + '.' + setting_name, "");
if (!setting_value.empty())
config_settings.emplace_back(setting_name, setting_value);
}
return config_settings;
}
String ExternalDataSourceConfiguration::toString() const
{
WriteBufferFromOwnString configuration_info;
@ -67,7 +89,9 @@ void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration
}
std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection)
template <typename T>
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<T> & storage_settings)
{
if (args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
@ -90,6 +114,8 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name());
}
SettingsChanges config_settings = getSettingsChangesFromConfig(storage_settings, config, collection_prefix);
configuration.host = config.getString(collection_prefix + ".host", "");
configuration.port = config.getInt(collection_prefix + ".port", 0);
configuration.username = config.getString(collection_prefix + ".user", "");
@ -131,6 +157,7 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
if (arg_value_literal)
{
auto arg_value = arg_value_literal->value;
if (arg_name == "host")
configuration.host = arg_value.safeGet<String>();
else if (arg_name == "port")
@ -147,6 +174,8 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
configuration.schema = arg_value.safeGet<String>();
else if (arg_name == "addresses_expr")
configuration.addresses_expr = arg_value.safeGet<String>();
else if (storage_settings.has(arg_name))
config_settings.emplace_back(arg_name, arg_value);
else
non_common_args.emplace_back(std::make_pair(arg_name, arg_value_ast));
}
@ -161,8 +190,7 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
}
}
ExternalDataSourceConfig source_config{ .configuration = configuration, .specific_args = non_common_args };
return source_config;
return ExternalDataSourceInfo{ .configuration = configuration, .specific_args = non_common_args, .settings_changes = config_settings };
}
return std::nullopt;
}
@ -179,9 +207,10 @@ static void validateConfigKeys(
}
}
std::optional<ExternalDataSourceConfiguration> getExternalDataSourceConfiguration(
template <typename T>
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
ContextPtr context, HasConfigKeyFunc has_config_key)
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<T> & settings)
{
validateConfigKeys(dict_config, dict_config_prefix, has_config_key);
ExternalDataSourceConfiguration configuration;
@ -192,6 +221,10 @@ std::optional<ExternalDataSourceConfiguration> getExternalDataSourceConfiguratio
const auto & config = context->getConfigRef();
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
validateConfigKeys(dict_config, collection_prefix, has_config_key);
auto config_settings = getSettingsChangesFromConfig(settings, config, collection_prefix);
auto dict_settings = getSettingsChangesFromConfig(settings, dict_config, dict_config_prefix);
/// dictionary config settings override collection settings.
config_settings.insert(config_settings.end(), dict_settings.begin(), dict_settings.end());
if (!config.has(collection_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
@ -210,7 +243,7 @@ std::optional<ExternalDataSourceConfiguration> getExternalDataSourceConfiguratio
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some of the parameters and dictionary parameters are not added");
}
return configuration;
return ExternalDataSourceInfo{ .configuration = configuration, .specific_args = {}, .settings_changes = config_settings };
}
return std::nullopt;
}
@ -225,7 +258,7 @@ ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
auto named_collection = getExternalDataSourceConfiguration(dict_config, dict_config_prefix, context, has_config_key);
if (named_collection)
{
common_configuration = *named_collection;
common_configuration = named_collection->configuration;
}
else
{
@ -391,6 +424,7 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
return std::nullopt;
}
template<typename T>
bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<T> & settings, ContextPtr context)
{
@ -405,14 +439,7 @@ bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<T> & set
if (!config.has(config_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name());
SettingsChanges config_settings;
for (const auto & setting : settings.all())
{
const auto & setting_name = setting.getName();
auto setting_value = config.getString(config_prefix + '.' + setting_name, "");
if (!setting_value.empty())
config_settings.emplace_back(setting_name, setting_value);
}
auto config_settings = getSettingsChangesFromConfig(settings, config, config_prefix);
/// Check key-value arguments.
for (size_t i = 1; i < args.size(); ++i)
@ -450,4 +477,32 @@ bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<RabbitMQ
template
bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<KafkaSettingsTraits> & settings, ContextPtr context);
#endif
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<EmptySettingsTraits> & storage_settings);
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<EmptySettingsTraits> & settings);
template
SettingsChanges getSettingsChangesFromConfig(
const BaseSettings<EmptySettingsTraits> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
#if USE_MYSQL
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<MySQLSettingsTraits> & storage_settings);
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<MySQLSettingsTraits> & settings);
template
SettingsChanges getSettingsChangesFromConfig(
const BaseSettings<MySQLSettingsTraits> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
#endif
}

View File

@ -7,6 +7,11 @@
namespace DB
{
#define EMPTY_SETTINGS(M)
DECLARE_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
struct EmptySettings : public BaseSettings<EmptySettingsTraits> {};
struct ExternalDataSourceConfiguration
{
String host;
@ -46,10 +51,11 @@ struct StorageMongoDBConfiguration : ExternalDataSourceConfiguration
using StorageSpecificArgs = std::vector<std::pair<String, ASTPtr>>;
struct ExternalDataSourceConfig
struct ExternalDataSourceInfo
{
ExternalDataSourceConfiguration configuration;
StorageSpecificArgs specific_args;
SettingsChanges settings_changes;
};
/* If there is a storage engine's configuration specified in the named_collections,
@ -62,13 +68,16 @@ struct ExternalDataSourceConfig
* Any key-value engine argument except common (`host`, `port`, `username`, `password`, `database`)
* is returned in EngineArgs struct.
*/
std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine = false, bool throw_on_no_collection = true);
template <typename T = EmptySettingsTraits>
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine = false, bool throw_on_no_collection = true, const BaseSettings<T> & storage_settings = {});
using HasConfigKeyFunc = std::function<bool(const String &)>;
std::optional<ExternalDataSourceConfiguration> getExternalDataSourceConfiguration(
template <typename T = EmptySettingsTraits>
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
ContextPtr context, HasConfigKeyFunc has_config_key);
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<T> & settings = {});
/// Highest priority is 0, the bigger the number in map, the less the priority.

View File

@ -8,9 +8,17 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings)
{
if (!mysql_settings.connection_pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size");
return mysqlxx::PoolWithFailover(
configuration.database, configuration.addresses, configuration.username, configuration.password,
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,

View File

@ -25,11 +25,14 @@ class ASTStorage;
DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)
using MySQLBaseSettings = BaseSettings<MySQLSettingsTraits>;
/** Settings for the MySQL family of engines.
*/
struct MySQLSettings : public BaseSettings<MySQLSettingsTraits>
struct MySQLSettings : public MySQLBaseSettings
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -104,11 +104,16 @@ void PostgreSQLReplicationHandler::addStorage(const std::string & table_name, St
}
void PostgreSQLReplicationHandler::startup()
void PostgreSQLReplicationHandler::startup(bool delayed)
{
/// We load tables in a separate thread, because this database is not created yet.
/// (will get "database is currently dropped or renamed")
if (delayed)
{
startup_task->activateAndSchedule();
}
else
{
startSynchronization(/* throw_on_error */ true);
}
}

View File

@ -29,7 +29,7 @@ public:
bool is_materialized_postgresql_database_);
/// Activate task to be run from a separate thread: wait until connection is available and call startReplication().
void startup();
void startup(bool delayed);
/// Stop replication without cleanup.
void shutdown();

View File

@ -87,14 +87,8 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
*replication_settings,
/* is_materialized_postgresql_database */false);
if (!is_attach)
{
replication_handler->addStorage(remote_table_name, this);
/// Start synchronization preliminary setup immediately and throw in case of failure.
/// It should be guaranteed that if MaterializedPostgreSQL table was created successfully, then
/// its nested table was also created.
replication_handler->startSynchronization(/* throw_on_error */ true);
}
replication_handler->startup(/* delayed */is_attach);
}
@ -234,19 +228,6 @@ void StorageMaterializedPostgreSQL::set(StoragePtr nested_storage)
}
void StorageMaterializedPostgreSQL::startup()
{
/// replication_handler != nullptr only in case of single table engine MaterializedPostgreSQL.
if (replication_handler && is_attach)
{
replication_handler->addStorage(remote_table_name, this);
/// In case of attach table use background startup in a separate thread. First wait until connection is reachable,
/// then check for nested table -- it should already be created.
replication_handler->startup();
}
}
void StorageMaterializedPostgreSQL::shutdown()
{
if (replication_handler)

View File

@ -74,8 +74,6 @@ public:
String getName() const override { return "MaterializedPostgreSQL"; }
void startup() override;
void shutdown() override;
/// Used only for single MaterializedPostgreSQL storage.

View File

@ -169,6 +169,7 @@ struct SelectQueryInfo
bool ignore_projections = false;
bool is_projection_query = false;
bool merge_tree_empty_result = false;
bool settings_limit_offset_done = false;
Block minmax_count_projection_block;
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr;
};

View File

@ -272,7 +272,7 @@ void registerStorageExternalDistributed(StorageFactory & factory)
ExternalDataSourceConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(inner_engine_args, args.getLocalContext()))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration);
for (const auto & [name, value] : storage_specific_args)

View File

@ -117,7 +117,7 @@ StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, C
StorageMongoDBConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration);
for (const auto & [arg_name, arg_value] : storage_specific_args)

View File

@ -238,15 +238,17 @@ SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMeta
}
StorageMySQLConfiguration StorageMySQL::getConfiguration(ASTs engine_args, ContextPtr context_)
StorageMySQLConfiguration StorageMySQL::getConfiguration(ASTs engine_args, ContextPtr context_, MySQLBaseSettings & storage_settings)
{
StorageMySQLConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context_))
if (auto named_collection = getExternalDataSourceConfiguration(
engine_args, context_, /* is_database_engine */false, /* throw_on_no_collection */true, storage_settings))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
storage_settings.applyChanges(settings_changes);
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
@ -298,9 +300,9 @@ void registerStorageMySQL(StorageFactory & factory)
{
factory.registerStorage("MySQL", [](const StorageFactory::Arguments & args)
{
auto configuration = StorageMySQL::getConfiguration(args.engine_args, args.getLocalContext());
MySQLSettings mysql_settings; /// TODO: move some arguments from the arguments to the SETTINGS.
auto configuration = StorageMySQL::getConfiguration(args.engine_args, args.getLocalContext(), mysql_settings);
if (args.storage_def->settings)
mysql_settings.loadFromQuery(*args.storage_def);

View File

@ -53,7 +53,7 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
static StorageMySQLConfiguration getConfiguration(ASTs engine_args, ContextPtr context_);
static StorageMySQLConfiguration getConfiguration(ASTs engine_args, ContextPtr context_, MySQLBaseSettings & storage_settings);
private:
friend class StorageMySQLSink;

View File

@ -390,7 +390,7 @@ StoragePostgreSQLConfiguration StoragePostgreSQL::getConfiguration(ASTs engine_a
StoragePostgreSQLConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};

View File

@ -140,7 +140,8 @@ void StorageView::read(
current_inner_query = query_info.view_query->clone();
}
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names);
auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, false, query_info.settings_limit_offset_done);
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, options, column_names);
interpreter.buildQueryPlan(query_plan);
/// It's expected that the columns read from storage are not constant.

View File

@ -37,8 +37,8 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
if (!args_func.arguments)
throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR);
configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context);
MySQLSettings mysql_settings;
configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context, mysql_settings);
const auto & settings = context->getSettingsRef();
mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec;
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;

View File

@ -60,7 +60,7 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr
* Specific args (remote): sharding_key, or database (in case it is not ASTLiteral).
* None of the common arguments is empty at this point, it is checked in getExternalDataSourceConfiguration.
*/
auto [common_configuration, storage_specific_args] = named_collection.value();
auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration);
for (const auto & [arg_name, arg_value] : storage_specific_args)

View File

@ -6,80 +6,98 @@ import json
import os
import sys
import time
from github import Github
from typing import List, Optional, Tuple
from env_helper import REPO_COPY, TEMP_PATH, CACHES_PATH, IMAGES_PATH
from s3_helper import S3Helper
from pr_info import PRInfo
from get_robot_token import get_best_robot_token
from version_helper import get_version_from_repo, update_version_local
from version_helper import (
ClickHouseVersion,
get_version_from_repo,
update_version_local,
)
from ccache_utils import get_ccache_if_not_exists, upload_ccache
from ci_config import CI_CONFIG
from ci_config import CI_CONFIG, BuildConfig
from docker_pull_helper import get_image_with_version
from tee_popen import TeePopen
def get_build_config(build_check_name, build_name):
if build_check_name == 'ClickHouse build check (actions)':
build_config_name = 'build_config'
def get_build_config(build_check_name: str, build_name: str) -> BuildConfig:
if build_check_name == "ClickHouse build check (actions)":
build_config_name = "build_config"
else:
raise Exception(f"Unknown build check name {build_check_name}")
return CI_CONFIG[build_config_name][build_name]
def _can_export_binaries(build_config):
if build_config['package_type'] != 'deb':
def _can_export_binaries(build_config: BuildConfig) -> bool:
if build_config["package_type"] != "deb":
return False
if build_config['bundled'] != "bundled":
if build_config["bundled"] != "bundled":
return False
if build_config['splitted'] == 'splitted':
if build_config["splitted"] == "splitted":
return False
if build_config['sanitizer'] != '':
if build_config["sanitizer"] != "":
return True
if build_config['build_type'] != '':
if build_config["build_type"] != "":
return True
return False
def get_packager_cmd(build_config, packager_path, output_path, build_version, image_version, ccache_path, pr_info):
package_type = build_config['package_type']
comp = build_config['compiler']
cmd = f"cd {packager_path} && ./packager --output-dir={output_path} --package-type={package_type} --compiler={comp}"
def get_packager_cmd(
build_config: BuildConfig,
packager_path: str,
output_path: str,
build_version: str,
image_version: str,
ccache_path: str,
pr_info: PRInfo,
) -> str:
package_type = build_config["package_type"]
comp = build_config["compiler"]
cmd = (
f"cd {packager_path} && ./packager --output-dir={output_path} "
f"--package-type={package_type} --compiler={comp}"
)
if build_config['build_type']:
cmd += ' --build-type={}'.format(build_config['build_type'])
if build_config['sanitizer']:
cmd += ' --sanitizer={}'.format(build_config['sanitizer'])
if build_config['splitted'] == 'splitted':
cmd += ' --split-binary'
if build_config['tidy'] == 'enable':
cmd += ' --clang-tidy'
if build_config["build_type"]:
cmd += " --build-type={}".format(build_config["build_type"])
if build_config["sanitizer"]:
cmd += " --sanitizer={}".format(build_config["sanitizer"])
if build_config["splitted"] == "splitted":
cmd += " --split-binary"
if build_config["tidy"] == "enable":
cmd += " --clang-tidy"
cmd += ' --cache=ccache'
cmd += ' --ccache_dir={}'.format(ccache_path)
cmd += " --cache=ccache"
cmd += " --ccache_dir={}".format(ccache_path)
if 'alien_pkgs' in build_config and build_config['alien_pkgs']:
if pr_info.number == 0 or 'release' in pr_info.labels:
cmd += ' --alien-pkgs rpm tgz'
if "alien_pkgs" in build_config and build_config["alien_pkgs"]:
if pr_info.number == 0 or "release" in pr_info.labels:
cmd += " --alien-pkgs rpm tgz"
cmd += ' --docker-image-version={}'.format(image_version)
cmd += ' --version={}'.format(build_version)
cmd += " --docker-image-version={}".format(image_version)
cmd += " --version={}".format(build_version)
if _can_export_binaries(build_config):
cmd += ' --with-binaries=tests'
cmd += " --with-binaries=tests"
return cmd
def get_image_name(build_config):
if build_config['package_type'] != 'deb':
return 'clickhouse/binary-builder'
def get_image_name(build_config: BuildConfig) -> str:
if build_config["package_type"] != "deb":
return "clickhouse/binary-builder"
else:
return 'clickhouse/deb-builder'
return "clickhouse/deb-builder"
def build_clickhouse(packager_cmd, logs_path, build_output_path):
build_log_path = os.path.join(logs_path, 'build_log.log')
def build_clickhouse(
packager_cmd: str, logs_path: str, build_output_path: str
) -> Tuple[str, bool]:
build_log_path = os.path.join(logs_path, "build_log.log")
success = False
with TeePopen(packager_cmd, build_log_path) as process:
retcode = process.wait()
if os.path.exists(build_output_path):
@ -88,16 +106,21 @@ def build_clickhouse(packager_cmd, logs_path, build_output_path):
build_results = []
if retcode == 0:
if len(build_results) != 0:
if len(build_results) > 0:
success = True
logging.info("Built successfully")
else:
logging.info("Success exit code, but no build artifacts => build failed")
logging.info(
"Success exit code, but no build artifacts => build failed"
)
else:
logging.info("Build failed")
return build_log_path, retcode == 0 and len(build_results) > 0
return build_log_path, success
def get_build_results_if_exists(s3_helper, s3_prefix):
def get_build_results_if_exists(
s3_helper: S3Helper, s3_prefix: str
) -> Optional[List[str]]:
try:
content = s3_helper.list_prefix(s3_prefix)
return content
@ -105,8 +128,19 @@ def get_build_results_if_exists(s3_helper, s3_prefix):
logging.info("Got exception %s listing %s", ex, s3_prefix)
return None
def create_json_artifact(temp_path, build_name, log_url, build_urls, build_config, elapsed, success):
subprocess.check_call(f"echo 'BUILD_NAME=build_urls_{build_name}' >> $GITHUB_ENV", shell=True)
def create_json_artifact(
temp_path: str,
build_name: str,
log_url: str,
build_urls: List[str],
build_config: BuildConfig,
elapsed: int,
success: bool,
):
subprocess.check_call(
f"echo 'BUILD_NAME=build_urls_{build_name}' >> $GITHUB_ENV", shell=True
)
result = {
"log_url": log_url,
@ -116,48 +150,79 @@ def create_json_artifact(temp_path, build_name, log_url, build_urls, build_confi
"status": success,
}
json_name = "build_urls_" + build_name + '.json'
json_name = "build_urls_" + build_name + ".json"
print ("Dump json report", result, "to", json_name, "with env", "build_urls_{build_name}")
print(
"Dump json report",
result,
"to",
json_name,
"with env",
"build_urls_{build_name}",
)
with open(os.path.join(temp_path, json_name), 'w') as build_links:
with open(os.path.join(temp_path, json_name), "w") as build_links:
json.dump(result, build_links)
if __name__ == "__main__":
def get_release_or_pr(
pr_info: PRInfo, build_config: BuildConfig, version: ClickHouseVersion
) -> str:
if "release" in pr_info.labels or "release-lts" in pr_info.labels:
# for release pull requests we use branch names prefixes, not pr numbers
return pr_info.head_ref
elif pr_info.number == 0 and build_config["package_type"] != "performance":
# for pushes to master - major version, but not for performance builds
# they havily relies on a fixed path for build package and nobody going
# to deploy them somewhere, so it's ok.
return ".".join(version.as_tuple()[:2])
# PR number for anything else
return str(pr_info.number)
def upload_master_static_binaries(
pr_info: PRInfo,
build_config: BuildConfig,
s3_helper: S3Helper,
build_output_path: str,
):
"""Upload binary artifacts to a static S3 links"""
if pr_info.number != 0:
return
elif build_config["package_type"] != "binary":
return
elif build_config["splitted"] == "splitted":
return
elif pr_info.base_ref != "master":
return
s3_path = "/".join(
(pr_info.base_ref, os.path.basename(build_output_path), "clickhouse")
)
binary = os.path.join(build_output_path, "clickhouse")
url = s3_helper.upload_build_file_to_s3(binary, s3_path)
print(f"::notice ::Binary static URL: {url}")
def main():
logging.basicConfig(level=logging.INFO)
repo_path = REPO_COPY
temp_path = TEMP_PATH
caches_path = CACHES_PATH
build_check_name = sys.argv[1]
build_name = sys.argv[2]
build_config = get_build_config(build_check_name, build_name)
if not os.path.exists(temp_path):
os.makedirs(temp_path)
if not os.path.exists(TEMP_PATH):
os.makedirs(TEMP_PATH)
pr_info = PRInfo()
logging.info("Repo copy path %s", repo_path)
logging.info("Repo copy path %s", REPO_COPY)
gh = Github(get_best_robot_token())
s3_helper = S3Helper('https://s3.amazonaws.com')
s3_helper = S3Helper("https://s3.amazonaws.com")
version = get_version_from_repo(repo_path)
release_or_pr = None
if 'release' in pr_info.labels or 'release-lts' in pr_info.labels:
# for release pull requests we use branch names prefixes, not pr numbers
release_or_pr = pr_info.head_ref
elif pr_info.number == 0 and build_config['package_type'] != "performance":
# for pushes to master - major version, but not for performance builds
# they havily relies on a fixed path for build package and nobody going
# to deploy them somewhere, so it's ok.
release_or_pr = ".".join(version.as_tuple()[:2])
else:
# PR number for anything else
release_or_pr = str(pr_info.number)
version = get_version_from_repo(REPO_COPY)
release_or_pr = get_release_or_pr(pr_info, build_config, version)
s3_path_prefix = "/".join((release_or_pr, pr_info.sha, build_name))
@ -167,14 +232,27 @@ if __name__ == "__main__":
if build_results is not None and len(build_results) > 0:
logging.info("Some build results found %s", build_results)
build_urls = []
log_url = ''
log_url = ""
for url in build_results:
if 'build_log.log' in url:
log_url = 'https://s3.amazonaws.com/clickhouse-builds/' + url.replace('+', '%2B').replace(' ', '%20')
if "build_log.log" in url:
log_url = "https://s3.amazonaws.com/clickhouse-builds/" + url.replace(
"+", "%2B"
).replace(" ", "%20")
else:
build_urls.append('https://s3.amazonaws.com/clickhouse-builds/' + url.replace('+', '%2B').replace(' ', '%20'))
create_json_artifact(temp_path, build_name, log_url, build_urls, build_config, 0, len(build_urls) > 0)
sys.exit(0)
build_urls.append(
"https://s3.amazonaws.com/clickhouse-builds/"
+ url.replace("+", "%2B").replace(" ", "%20")
)
create_json_artifact(
TEMP_PATH,
build_name,
log_url,
build_urls,
build_config,
0,
len(build_urls) > 0,
)
return
image_name = get_image_name(build_config)
docker_image = get_image_with_version(IMAGES_PATH, image_name)
@ -182,65 +260,93 @@ if __name__ == "__main__":
logging.info("Got version from repo %s", version.get_version_string())
version_type = 'testing'
if 'release' in pr_info.labels or 'release-lts' in pr_info.labels:
version_type = 'stable'
version_type = "testing"
if "release" in pr_info.labels or "release-lts" in pr_info.labels:
version_type = "stable"
update_version_local(repo_path, pr_info.sha, version, version_type)
update_version_local(REPO_COPY, pr_info.sha, version, version_type)
logging.info("Updated local files with version")
logging.info("Build short name %s", build_name)
build_output_path = os.path.join(temp_path, build_name)
build_output_path = os.path.join(TEMP_PATH, build_name)
if not os.path.exists(build_output_path):
os.makedirs(build_output_path)
ccache_path = os.path.join(caches_path, build_name + '_ccache')
ccache_path = os.path.join(CACHES_PATH, build_name + "_ccache")
logging.info("Will try to fetch cache for our build")
get_ccache_if_not_exists(ccache_path, s3_helper, pr_info.number, temp_path)
get_ccache_if_not_exists(ccache_path, s3_helper, pr_info.number, TEMP_PATH)
if not os.path.exists(ccache_path):
logging.info("cache was not fetched, will create empty dir")
os.makedirs(ccache_path)
if build_config['package_type'] == "performance" and pr_info.number != 0:
if build_config["package_type"] == "performance" and pr_info.number != 0:
# because perf tests store some information about git commits
subprocess.check_call(f"cd {repo_path} && git fetch origin master:master", shell=True)
subprocess.check_call(
f"cd {REPO_COPY} && git fetch origin master:master", shell=True
)
packager_cmd = get_packager_cmd(build_config, os.path.join(repo_path, "docker/packager"), build_output_path, version.get_version_string(), image_version, ccache_path, pr_info)
packager_cmd = get_packager_cmd(
build_config,
os.path.join(REPO_COPY, "docker/packager"),
build_output_path,
version.get_version_string(),
image_version,
ccache_path,
pr_info,
)
logging.info("Going to run packager with %s", packager_cmd)
build_clickhouse_log = os.path.join(temp_path, "build_log")
build_clickhouse_log = os.path.join(TEMP_PATH, "build_log")
if not os.path.exists(build_clickhouse_log):
os.makedirs(build_clickhouse_log)
start = time.time()
log_path, success = build_clickhouse(packager_cmd, build_clickhouse_log, build_output_path)
log_path, success = build_clickhouse(
packager_cmd, build_clickhouse_log, build_output_path
)
elapsed = int(time.time() - start)
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {build_output_path}", shell=True)
subprocess.check_call(
f"sudo chown -R ubuntu:ubuntu {build_output_path}", shell=True
)
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {ccache_path}", shell=True)
logging.info("Build finished with %s, log path %s", success, log_path)
logging.info("Will upload cache")
upload_ccache(ccache_path, s3_helper, pr_info.number, temp_path)
upload_ccache(ccache_path, s3_helper, pr_info.number, TEMP_PATH)
if os.path.exists(log_path):
log_url = s3_helper.upload_build_file_to_s3(log_path, s3_path_prefix + "/" + os.path.basename(log_path))
log_url = s3_helper.upload_build_file_to_s3(
log_path, s3_path_prefix + "/" + os.path.basename(log_path)
)
logging.info("Log url %s", log_url)
else:
logging.info("Build log doesn't exist")
build_urls = s3_helper.upload_build_folder_to_s3(build_output_path, s3_path_prefix, keep_dirs_in_s3_path=False, upload_symlinks=False)
build_urls = s3_helper.upload_build_folder_to_s3(
build_output_path,
s3_path_prefix,
keep_dirs_in_s3_path=False,
upload_symlinks=False,
)
logging.info("Got build URLs %s", build_urls)
print("::notice ::Build URLs: {}".format('\n'.join(build_urls)))
print("::notice ::Build URLs: {}".format("\n".join(build_urls)))
print("::notice ::Log URL: {}".format(log_url))
create_json_artifact(temp_path, build_name, log_url, build_urls, build_config, elapsed, success)
create_json_artifact(
TEMP_PATH, build_name, log_url, build_urls, build_config, elapsed, success
)
upload_master_static_binaries(pr_info, build_config, s3_helper, build_output_path)
# Fail build job if not successeded
if not success:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,5 +1,10 @@
#!/usr/bin/env python3
from typing import Dict, TypeVar
ConfValue = TypeVar("ConfValue", str, bool)
BuildConfig = Dict[str, ConfValue]
CI_CONFIG = {
"build_config": {
"package_release": {
@ -334,4 +339,4 @@ CI_CONFIG = {
"required_build": "performance",
},
},
}
} # type: dict

View File

@ -41,6 +41,9 @@ class PRInfo:
github_event = {'commits': 1, 'after': 'HEAD', 'ref': None}
self.event = github_event
self.changed_files = set([])
ref = github_event.get("ref", "refs/head/master")
if ref.startswith('refs/heads/'):
ref = ref[11:]
# workflow completed event, used for PRs only
if 'action' in github_event and github_event['action'] == 'completed':
@ -93,10 +96,10 @@ class PRInfo:
if pull_request is None or pull_request['state'] == 'closed': # it's merged PR to master
self.number = 0
self.labels = {}
self.pr_html_url = f"{repo_prefix}/commits/master"
self.base_ref = "master"
self.pr_html_url = f"{repo_prefix}/commits/{ref}"
self.base_ref = ref
self.base_name = self.repo_full_name
self.head_ref = "master"
self.head_ref = ref
self.head_name = self.repo_full_name
self.diff_url = \
f"https://api.github.com/repos/{GITHUB_REPOSITORY}/compare/{github_event['before']}...{self.sha}"
@ -126,10 +129,10 @@ class PRInfo:
self.task_url = f"{repo_prefix}/actions/runs/{GITHUB_RUN_ID or '0'}"
self.commit_html_url = f"{repo_prefix}/commits/{self.sha}"
self.repo_full_name = GITHUB_REPOSITORY
self.pr_html_url = f"{repo_prefix}/commits/master"
self.base_ref = "master"
self.pr_html_url = f"{repo_prefix}/commits/{ref}"
self.base_ref = ref
self.base_name = self.repo_full_name
self.head_ref = "master"
self.head_ref = ref
self.head_name = self.repo_full_name
if need_changed_files:

View File

@ -15,11 +15,19 @@ class TeePopen:
self.command = command
self.log_file = log_file
self.env = env
self.process = None
def __enter__(self):
# pylint: disable=W0201
self.process = Popen(self.command, shell=True, universal_newlines=True, env=self.env, stderr=STDOUT, stdout=PIPE, bufsize=1)
self.log_file = open(self.log_file, 'w', encoding='utf-8')
self.process = Popen(
self.command,
shell=True,
universal_newlines=True,
env=self.env,
stderr=STDOUT,
stdout=PIPE,
bufsize=1,
)
self.log_file = open(self.log_file, "w", encoding="utf-8")
return self
def __exit__(self, t, value, traceback):

View File

@ -0,0 +1,273 @@
import psycopg2
import time
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
postgres_table_template = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
postgres_table_template_2 = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value1 Integer, value2 Integer, value3 Integer, PRIMARY KEY(key))
"""
postgres_table_template_3 = """
CREATE TABLE IF NOT EXISTS "{}" (
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
"""
postgres_table_template_4 = """
CREATE TABLE IF NOT EXISTS "{}"."{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
postgres_table_template_5 = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value UUID, PRIMARY KEY(key))
"""
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
if database == True:
conn_string = f"host={ip} port={port} dbname='{database_name}' user='postgres' password='mysecretpassword'"
else:
conn_string = f"host={ip} port={port} user='postgres' password='mysecretpassword'"
if replication:
conn_string += " replication='database'"
conn = psycopg2.connect(conn_string)
if auto_commit:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_replication_slot(conn, slot_name='user_slot'):
cursor = conn.cursor()
cursor.execute(f'CREATE_REPLICATION_SLOT {slot_name} LOGICAL pgoutput EXPORT_SNAPSHOT')
result = cursor.fetchall()
print(result[0][0]) # slot name
print(result[0][1]) # start lsn
print(result[0][2]) # snapshot
return result[0][2]
def drop_replication_slot(conn, slot_name='user_slot'):
cursor = conn.cursor()
cursor.execute(f"select pg_drop_replication_slot('{slot_name}')")
def create_postgres_schema(cursor, schema_name):
drop_postgres_schema(cursor, schema_name)
cursor.execute(f'CREATE SCHEMA {schema_name}')
def drop_postgres_schema(cursor, schema_name):
cursor.execute(f'DROP SCHEMA IF EXISTS {schema_name} CASCADE')
def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template):
drop_postgres_table(cursor, table_name)
cursor.execute(template.format(table_name))
if replica_identity_full:
cursor.execute(f'ALTER TABLE {table_name} REPLICA IDENTITY FULL;')
def drop_postgres_table(cursor, table_name):
cursor.execute(f"""DROP TABLE IF EXISTS "{table_name}" """)
def create_postgres_table_with_schema(cursor, schema_name, table_name):
drop_postgres_table_with_schema(cursor, schema_name, table_name)
cursor.execute(postgres_table_template_4.format(schema_name, table_name))
def drop_postgres_table_with_schema(cursor, schema_name, table_name):
cursor.execute(f"""DROP TABLE IF EXISTS "{schema_name}"."{table_name}" """)
class PostgresManager:
def __init__(self):
self.created_postgres_db_list = set()
self.created_materialized_postgres_db_list = set()
self.created_ch_postgres_db_list = set()
def init(self, instance, ip, port):
self.instance = instance
self.ip = ip
self.port = port
self.conn = get_postgres_conn(ip=self.ip, port=self.port)
self.prepare()
def restart(self):
try:
self.clear()
self.prepare()
except Exception as ex:
self.prepare()
raise ex
def prepare(self):
conn = get_postgres_conn(ip=self.ip, port=self.port)
cursor = conn.cursor()
self.create_postgres_db(cursor, 'postgres_database')
self.create_clickhouse_postgres_db(ip=self.ip, port=self.port)
def clear(self):
if self.conn.closed == 0:
self.conn.close()
for db in self.created_materialized_postgres_db_list.copy():
self.drop_materialized_db(db);
for db in self.created_ch_postgres_db_list.copy():
self.drop_clickhouse_postgres_db(db)
if len(self.created_postgres_db_list) > 0:
conn = get_postgres_conn(ip=self.ip, port=self.port)
cursor = conn.cursor()
for db in self.created_postgres_db_list.copy():
self.drop_postgres_db(cursor, db)
def get_db_cursor(self):
self.conn = get_postgres_conn(ip=self.ip, port=self.port, database=True)
return self.conn.cursor()
def create_postgres_db(self, cursor, name='postgres_database'):
self.drop_postgres_db(cursor, name)
self.created_postgres_db_list.add(name)
cursor.execute(f"CREATE DATABASE {name}")
def drop_postgres_db(self, cursor, name='postgres_database'):
cursor.execute(f"DROP DATABASE IF EXISTS {name}")
if name in self.created_postgres_db_list:
self.created_postgres_db_list.remove(name)
def create_clickhouse_postgres_db(self, ip, port, name='postgres_database', database_name='postgres_database', schema_name=''):
self.drop_clickhouse_postgres_db(name)
self.created_ch_postgres_db_list.add(name)
if len(schema_name) == 0:
self.instance.query(f'''
CREATE DATABASE {name}
ENGINE = PostgreSQL('{ip}:{port}', '{database_name}', 'postgres', 'mysecretpassword')''')
else:
self.instance.query(f'''
CREATE DATABASE {name}
ENGINE = PostgreSQL('{ip}:{port}', '{database_name}', 'postgres', 'mysecretpassword', '{schema_name}')''')
def drop_clickhouse_postgres_db(self, name='postgres_database'):
self.instance.query(f'DROP DATABASE IF EXISTS {name}')
if name in self.created_ch_postgres_db_list:
self.created_ch_postgres_db_list.remove(name)
def create_materialized_db(self, ip, port,
materialized_database='test_database', postgres_database='postgres_database',
settings=[], table_overrides=''):
self.created_materialized_postgres_db_list.add(materialized_database)
self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database}")
create_query = f"CREATE DATABASE {materialized_database} ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', 'postgres', 'mysecretpassword')"
if len(settings) > 0:
create_query += " SETTINGS "
for i in range(len(settings)):
if i != 0:
create_query += ', '
create_query += settings[i]
create_query += table_overrides
self.instance.query(create_query)
assert materialized_database in self.instance.query('SHOW DATABASES')
def drop_materialized_db(self, materialized_database='test_database'):
self.instance.query(f'DROP DATABASE IF EXISTS {materialized_database} NO DELAY')
if materialized_database in self.created_materialized_postgres_db_list:
self.created_materialized_postgres_db_list.remove(materialized_database)
assert materialized_database not in self.instance.query('SHOW DATABASES')
def create_and_fill_postgres_table(self, table_name):
conn = get_postgres_conn(ip=self.ip, port=self.port, database=True)
cursor = conn.cursor()
self.create_and_fill_postgres_table_from_cursor(cursor, table_name)
def create_and_fill_postgres_table_from_cursor(self, cursor, table_name):
create_postgres_table(cursor, table_name);
self.instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(50)")
def create_and_fill_postgres_tables(self, tables_num, numbers=50):
conn = get_postgres_conn(ip=self.ip, port=self.port, database=True)
cursor = conn.cursor()
self.create_and_fill_postgres_tables_from_cursor(cursor, tables_num, numbers=numbers)
def create_and_fill_postgres_tables_from_cursor(self, cursor, tables_num, numbers=50):
for i in range(tables_num):
table_name = f'postgresql_replica_{i}'
create_postgres_table(cursor, table_name);
if numbers > 0:
self.instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers({numbers})")
queries = [
'INSERT INTO postgresql_replica_{} select i, i from generate_series(0, 10000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;',
'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;',
"UPDATE postgresql_replica_{} SET key=key+20000 WHERE key%2=0",
'INSERT INTO postgresql_replica_{} select i, i from generate_series(40000, 50000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;',
'UPDATE postgresql_replica_{} SET value = value + 101 WHERE key % 2 = 1;',
"UPDATE postgresql_replica_{} SET key=key+80000 WHERE key%2=1",
'DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;',
'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;',
'INSERT INTO postgresql_replica_{} select i, i from generate_series(200000, 250000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;',
'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;',
"UPDATE postgresql_replica_{} SET key=key+500000 WHERE key%2=1",
'INSERT INTO postgresql_replica_{} select i, i from generate_series(1000000, 1050000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;',
"UPDATE postgresql_replica_{} SET key=key+10000000",
'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;',
'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;'
]
def assert_nested_table_is_created(instance, table_name, materialized_database='test_database', schema_name=''):
if len(schema_name) == 0:
table = table_name
else:
table = schema_name + "." + table_name
print(f'Checking table {table} exists in {materialized_database}')
database_tables = instance.query(f'SHOW TABLES FROM {materialized_database}')
while table not in database_tables:
time.sleep(0.2)
database_tables = instance.query(f'SHOW TABLES FROM {materialized_database}')
assert(table in database_tables)
def assert_number_of_columns(instance, expected, table_name, database_name='test_database'):
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
while (int(result) != expected):
time.sleep(1)
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
print('Number of columns ok')
def check_tables_are_synchronized(instance, table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
assert_nested_table_is_created(instance, table_name, materialized_database, schema_name)
table_path = ''
if len(schema_name) == 0:
table_path = f'{materialized_database}.{table_name}'
else:
table_path = f'{materialized_database}.`{schema_name}.{table_name}`'
print(f"Checking table is synchronized: {table_path}")
result_query = f'select * from {table_path} order by {order_by};'
expected = instance.query(f'select * from {postgres_database}.{table_name} order by {order_by};')
result = instance.query(result_query)
for _ in range(30):
if result == expected:
break
else:
time.sleep(0.5)
result = instance.query(result_query)
assert(result == expected)
def check_several_tables_are_synchronized(instance, tables_num, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
for i in range(tables_num):
check_tables_are_synchronized(instance, f'postgresql_replica_{i}');

View File

@ -21,5 +21,14 @@
<database>test</database>
<table>test_table</table>
</mysql3>
<mysql4>
<user>root</user>
<password>clickhouse</password>
<host>mysql57</host>
<port>3306</port>
<database>test</database>
<table>test_table</table>
<connection_pool_size>0</connection_pool_size>
</mysql4>
</named_collections>
</clickhouse>

View File

@ -205,6 +205,39 @@ def test_predefined_connection_configuration(started_cluster):
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
assert(int(result) == 200)
instance.query('''
DROP DICTIONARY IF EXISTS dict;
CREATE DICTIONARY dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(MYSQL(NAME mysql1 connection_pool_size 0))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = instance.query_and_get_error("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
assert 'Connection pool cannot have zero size' in result
instance.query('''
DROP DICTIONARY IF EXISTS dict;
CREATE DICTIONARY dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(MYSQL(NAME mysql4))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = instance.query_and_get_error("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
assert 'Connection pool cannot have zero size' in result
instance.query('''
DROP DICTIONARY IF EXISTS dict;
CREATE DICTIONARY dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(MYSQL(NAME mysql4 connection_pool_size 1))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
assert(int(result) == 200)
def create_mysql_db(mysql_connection, name):
with mysql_connection.cursor() as cursor:

View File

@ -12,235 +12,62 @@ from helpers.test_tools import TSV
from random import randrange
import threading
from helpers.postgres_utility import get_postgres_conn
from helpers.postgres_utility import PostgresManager
from helpers.postgres_utility import create_replication_slot, drop_replication_slot
from helpers.postgres_utility import create_postgres_schema, drop_postgres_schema
from helpers.postgres_utility import create_postgres_table, drop_postgres_table
from helpers.postgres_utility import create_postgres_table_with_schema, drop_postgres_table_with_schema
from helpers.postgres_utility import check_tables_are_synchronized
from helpers.postgres_utility import check_several_tables_are_synchronized
from helpers.postgres_utility import assert_nested_table_is_created
from helpers.postgres_utility import assert_number_of_columns
from helpers.postgres_utility import postgres_table_template, postgres_table_template_2, postgres_table_template_3, postgres_table_template_4, postgres_table_template_5
from helpers.postgres_utility import queries
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs = ['configs/log_conf.xml'],
user_configs = ['configs/users.xml'],
with_postgres=True, stay_alive=True)
postgres_table_template = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
postgres_table_template_2 = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value1 Integer, value2 Integer, value3 Integer, PRIMARY KEY(key))
"""
postgres_table_template_3 = """
CREATE TABLE IF NOT EXISTS "{}" (
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
"""
postgres_table_template_4 = """
CREATE TABLE IF NOT EXISTS "{}"."{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
postgres_table_template_5 = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value UUID, PRIMARY KEY(key))
"""
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
if database == True:
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name)
else:
conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port)
if replication:
conn_string += " replication='database'"
conn = psycopg2.connect(conn_string)
if auto_commit:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_replication_slot(conn, slot_name='user_slot'):
cursor = conn.cursor()
cursor.execute('CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT'.format(slot_name))
result = cursor.fetchall()
print(result[0][0]) # slot name
print(result[0][1]) # start lsn
print(result[0][2]) # snapshot
return result[0][2]
def drop_replication_slot(conn, slot_name='user_slot'):
cursor = conn.cursor()
cursor.execute("select pg_drop_replication_slot('{}')".format(slot_name))
def create_postgres_db(cursor, name='postgres_database'):
cursor.execute("CREATE DATABASE {}".format(name))
def drop_postgres_db(cursor, name='postgres_database'):
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
def drop_postgres_schema(cursor, schema_name):
cursor.execute('DROP SCHEMA IF EXISTS {} CASCADE'.format(schema_name))
def create_postgres_schema(cursor, schema_name):
drop_postgres_schema(cursor, schema_name)
cursor.execute('CREATE SCHEMA {}'.format(schema_name))
def create_clickhouse_postgres_db(ip, port, name='postgres_database', database_name='postgres_database', schema_name=''):
drop_clickhouse_postgres_db(name)
if len(schema_name) == 0:
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, database_name))
else:
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword', '{}')'''.format(name, ip, port, database_name, schema_name))
def drop_clickhouse_postgres_db(name='postgres_database'):
instance.query('DROP DATABASE IF EXISTS {}'.format(name))
def create_materialized_db(ip, port,
materialized_database='test_database',
postgres_database='postgres_database',
settings=[], table_overrides=''):
instance.query(f"DROP DATABASE IF EXISTS {materialized_database}")
create_query = f"CREATE DATABASE {materialized_database} ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', 'postgres', 'mysecretpassword')"
if len(settings) > 0:
create_query += " SETTINGS "
for i in range(len(settings)):
if i != 0:
create_query += ', '
create_query += settings[i]
create_query += table_overrides
instance.query(create_query)
assert materialized_database in instance.query('SHOW DATABASES')
def drop_materialized_db(materialized_database='test_database'):
instance.query('DROP DATABASE IF EXISTS {}'.format(materialized_database))
assert materialized_database not in instance.query('SHOW DATABASES')
def drop_postgres_table(cursor, table_name):
cursor.execute("""DROP TABLE IF EXISTS "{}" """.format(table_name))
def drop_postgres_table_with_schema(cursor, schema_name, table_name):
cursor.execute("""DROP TABLE IF EXISTS "{}"."{}" """.format(schema_name, table_name))
def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template):
drop_postgres_table(cursor, table_name)
cursor.execute(template.format(table_name))
if replica_identity_full:
cursor.execute('ALTER TABLE {} REPLICA IDENTITY FULL;'.format(table_name))
def create_postgres_table_with_schema(cursor, schema_name, table_name):
drop_postgres_table_with_schema(cursor, schema_name, table_name)
cursor.execute(postgres_table_template_4.format(schema_name, table_name))
queries = [
'INSERT INTO postgresql_replica_{} select i, i from generate_series(0, 10000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;',
'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;',
"UPDATE postgresql_replica_{} SET key=key+20000 WHERE key%2=0",
'INSERT INTO postgresql_replica_{} select i, i from generate_series(40000, 50000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;',
'UPDATE postgresql_replica_{} SET value = value + 101 WHERE key % 2 = 1;',
"UPDATE postgresql_replica_{} SET key=key+80000 WHERE key%2=1",
'DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;',
'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;',
'INSERT INTO postgresql_replica_{} select i, i from generate_series(200000, 250000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;',
'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;',
"UPDATE postgresql_replica_{} SET key=key+500000 WHERE key%2=1",
'INSERT INTO postgresql_replica_{} select i, i from generate_series(1000000, 1050000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;',
"UPDATE postgresql_replica_{} SET key=key+10000000",
'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;',
'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;'
]
def assert_nested_table_is_created(table_name, materialized_database='test_database', schema_name=''):
if len(schema_name) == 0:
table = table_name
else:
table = schema_name + "." + table_name
print(f'Checking table {table} exists in {materialized_database}')
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
while table not in database_tables:
time.sleep(0.2)
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
assert(table in database_tables)
def assert_number_of_columns(expected, table_name, database_name='test_database'):
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
while (int(result) != expected):
time.sleep(1)
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
print('Number of columns ok')
@pytest.mark.timeout(320)
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
assert_nested_table_is_created(table_name, materialized_database, schema_name)
print(f"Checking table is synchronized. Table name: {table_name}, table schema: {schema_name}")
expected = instance.query('select * from {}.{} order by {};'.format(postgres_database, table_name, order_by))
if len(schema_name) == 0:
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
else:
result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by))
try_num = 0
while result != expected:
time.sleep(0.5)
if len(schema_name) == 0:
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
else:
result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by))
try_num += 1
if try_num > 30:
break
assert(result == expected)
pg_manager = PostgresManager()
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
conn = get_postgres_conn(ip=cluster.postgres_ip, port=cluster.postgres_port)
cursor = conn.cursor()
create_postgres_db(cursor, 'postgres_database')
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port)
instance.query("DROP DATABASE IF EXISTS test_database")
pg_manager.init(instance, cluster.postgres_ip, cluster.postgres_port)
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def setup_teardown():
print("PostgreSQL is available - running test")
yield # run test
pg_manager.restart()
def test_add_new_table_to_replication(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
cursor.execute('DROP TABLE IF EXISTS test_table')
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
pg_manager.create_and_fill_postgres_tables_from_cursor(cursor, NUM_TABLES, 10000)
pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
check_several_tables_are_synchronized(instance, NUM_TABLES)
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
table_name = 'postgresql_replica_5'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
pg_manager.create_and_fill_postgres_table_from_cursor(cursor, table_name)
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(") # Check without ip
@ -252,16 +79,16 @@ def test_add_new_table_to_replication(started_cluster):
result = instance.query_and_get_error("ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables='tabl1'")
assert('Database engine MaterializedPostgreSQL does not support setting' in result)
instance.query("ATTACH TABLE test_database.{}".format(table_name));
instance.query(f"ATTACH TABLE test_database.{table_name}");
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\n")
check_tables_are_synchronized(table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name))
check_tables_are_synchronized(table_name);
check_tables_are_synchronized(instance, table_name);
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(10000, 10000)")
check_tables_are_synchronized(instance, table_name);
result = instance.query_and_get_error("ATTACH TABLE test_database.{}".format(table_name));
result = instance.query_and_get_error(f"ATTACH TABLE test_database.{table_name}");
assert('Table test_database.postgresql_replica_5 already exists' in result)
result = instance.query_and_get_error("ATTACH TABLE test_database.unknown_table");
@ -274,14 +101,14 @@ def test_add_new_table_to_replication(started_cluster):
table_name = 'postgresql_replica_6'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
instance.query("ATTACH TABLE test_database.{}".format(table_name));
instance.query(f"ATTACH TABLE test_database.{table_name}");
instance.restart_clickhouse()
table_name = 'postgresql_replica_7'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
instance.query("ATTACH TABLE test_database.{}".format(table_name));
instance.query(f"ATTACH TABLE test_database.{table_name}");
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
@ -289,33 +116,14 @@ def test_add_new_table_to_replication(started_cluster):
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\npostgresql_replica_6\npostgresql_replica_7\n")
check_several_tables_are_synchronized(instance, NUM_TABLES + 3)
for i in range(NUM_TABLES + 3):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES + 3):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
def test_remove_table_from_replication(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
cursor.execute('DROP TABLE IF EXISTS test_table')
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
pg_manager.create_and_fill_postgres_tables(NUM_TABLES, 10000)
pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
check_several_tables_are_synchronized(instance, NUM_TABLES)
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
@ -325,8 +133,8 @@ def test_remove_table_from_replication(started_cluster):
assert(result[-59:] == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n")
table_name = 'postgresql_replica_4'
instance.query('DETACH TABLE test_database.{}'.format(table_name));
result = instance.query_and_get_error('SELECT * FROM test_database.{}'.format(table_name))
instance.query(f'DETACH TABLE test_database.{table_name}');
result = instance.query_and_get_error(f'SELECT * FROM test_database.{table_name}')
assert("doesn't exist" in result)
result = instance.query("SHOW TABLES FROM test_database")
@ -336,52 +144,42 @@ def test_remove_table_from_replication(started_cluster):
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3\\'\n")
instance.query('ATTACH TABLE test_database.{}'.format(table_name));
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
instance.query(f'ATTACH TABLE test_database.{table_name}');
check_tables_are_synchronized(instance, table_name);
check_several_tables_are_synchronized(instance, NUM_TABLES)
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-159:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
table_name = 'postgresql_replica_1'
instance.query('DETACH TABLE test_database.{}'.format(table_name));
instance.query(f'DETACH TABLE test_database.{table_name}');
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
for i in range(NUM_TABLES):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
cursor = pg_manager.get_db_cursor()
cursor.execute(f'drop table if exists postgresql_replica_0;')
# Removing from replication table which does not exist in PostgreSQL must be ok.
instance.query('DETACH TABLE test_database.postgresql_replica_0');
assert instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
drop_materialized_db()
def test_predefined_connection_configuration(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
cursor.execute(f'DROP TABLE IF EXISTS test_table')
cursor.execute(f'CREATE TABLE test_table (key integer PRIMARY KEY, value integer)')
cursor.execute(f'INSERT INTO test_table SELECT 1, 2')
instance.query("CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1) SETTINGS materialized_postgresql_tables_list='test_table'")
check_tables_are_synchronized("test_table");
drop_materialized_db()
cursor.execute('DROP TABLE IF EXISTS test_table')
check_tables_are_synchronized(instance, "test_table");
pg_manager.drop_materialized_db()
insert_counter = 0
def test_database_with_single_non_default_schema(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
NUM_TABLES=5
schema_name = 'test_schema'
materialized_db = 'test_database'
@ -405,18 +203,17 @@ def test_database_with_single_non_default_schema(started_cluster):
def check_all_tables_are_synchronized():
for i in range(NUM_TABLES):
print('checking table', i)
check_tables_are_synchronized("postgresql_replica_{}".format(i), postgres_database=clickhouse_postgres_db);
check_tables_are_synchronized(instance, f"postgresql_replica_{i}", postgres_database=clickhouse_postgres_db);
print('synchronization Ok')
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
pg_manager.create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
create_postgres_table_with_schema(cursor, schema_name, table_name);
create_postgres_table_with_schema(cursor, schema_name, f'postgresql_replica_{i}');
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_schema = '{schema_name}'", "materialized_postgresql_allow_automatic_update = 1"])
insert_into_tables()
@ -434,22 +231,19 @@ def test_database_with_single_non_default_schema(started_cluster):
cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)")
assert_number_of_columns(3, f'postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", postgres_database=clickhouse_postgres_db);
assert_number_of_columns(instance, 3, f'postgresql_replica_{altered_table}')
check_tables_are_synchronized(instance, f"postgresql_replica_{altered_table}", postgres_database=clickhouse_postgres_db);
print('DETACH-ATTACH')
detached_table_name = "postgresql_replica_1"
instance.query(f"DETACH TABLE {materialized_db}.{detached_table_name}")
assert not instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
instance.query(f"ATTACH TABLE {materialized_db}.{detached_table_name}")
check_tables_are_synchronized(detached_table_name, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
check_tables_are_synchronized(instance, detached_table_name, postgres_database=clickhouse_postgres_db);
def test_database_with_multiple_non_default_schemas_1(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
NUM_TABLES = 5
schema_name = 'test_schema'
@ -475,11 +269,11 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
def check_all_tables_are_synchronized():
for i in range(NUM_TABLES):
print('checking table', i)
check_tables_are_synchronized("postgresql_replica_{}".format(i), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
check_tables_are_synchronized(instance, "postgresql_replica_{}".format(i), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
print('synchronization Ok')
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
pg_manager.create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
@ -489,7 +283,7 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
publication_tables += schema_name + '.' + table_name
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_tables_list = '{publication_tables}'", "materialized_postgresql_tables_list_with_schema=1", "materialized_postgresql_allow_automatic_update = 1"])
check_all_tables_are_synchronized()
@ -507,8 +301,8 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)")
assert_number_of_columns(3, f'{schema_name}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db);
assert_number_of_columns(instance, 3, f'{schema_name}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(instance, f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db);
print('DETACH-ATTACH')
detached_table_name = "postgresql_replica_1"
@ -516,15 +310,11 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
assert not instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
instance.query(f"ATTACH TABLE {materialized_db}.`{schema_name}.{detached_table_name}`")
assert_show_tables("test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n")
check_tables_are_synchronized(detached_table_name, schema_name=schema_name, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
check_tables_are_synchronized(instance, detached_table_name, schema_name=schema_name, postgres_database=clickhouse_postgres_db);
def test_database_with_multiple_non_default_schemas_2(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
NUM_TABLES = 2
schemas_num = 2
schema_list = 'schema0, schema1'
@ -539,7 +329,7 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
for ti in range(NUM_TABLES):
table_name = f'postgresql_replica_{ti}'
print(f'checking table {schema_name}.{table_name}')
check_tables_are_synchronized(f'{table_name}', schema_name=schema_name, postgres_database=clickhouse_postgres_db);
check_tables_are_synchronized(instance, f'{table_name}', schema_name=schema_name, postgres_database=clickhouse_postgres_db);
print('synchronized Ok')
def insert_into_tables():
@ -560,14 +350,16 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
schema_name = f'schema{i}'
clickhouse_postgres_db = f'clickhouse_postgres_db{i}'
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
pg_manager.create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for ti in range(NUM_TABLES):
table_name = f'postgresql_replica_{ti}'
create_postgres_table_with_schema(cursor, schema_name, table_name);
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_schema_list = '{schema_list}'", "materialized_postgresql_allow_automatic_update = 1"])
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_schema_list = '{schema_list}'",
"materialized_postgresql_allow_automatic_update = 1"])
check_all_tables_are_synchronized()
insert_into_tables()
@ -586,8 +378,8 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
cursor.execute(f"ALTER TABLE schema{altered_schema}.postgresql_replica_{altered_table} ADD COLUMN value2 integer")
instance.query(f"INSERT INTO clickhouse_postgres_db{altered_schema}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(1000 * {insert_counter}, 1000)")
assert_number_of_columns(3, f'schema{altered_schema}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=f"schema{altered_schema}", postgres_database=clickhouse_postgres_db);
assert_number_of_columns(instance, 3, f'schema{altered_schema}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(instance, f"postgresql_replica_{altered_table}", schema_name=f"schema{altered_schema}", postgres_database=clickhouse_postgres_db);
print('DETACH-ATTACH')
detached_table_name = "postgresql_replica_1"
@ -597,23 +389,22 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
assert not instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
instance.query(f"ATTACH TABLE {materialized_db}.`{detached_table_schema}.{detached_table_name}`")
assert_show_tables("schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n")
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=detached_table_schema, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
check_tables_are_synchronized(instance, f"postgresql_replica_{altered_table}", schema_name=detached_table_schema, postgres_database=clickhouse_postgres_db);
def test_table_override(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
table_name = 'table_override'
materialized_database = 'test_database'
create_postgres_table(cursor, table_name, template=postgres_table_template_5);
instance.query(f"create table {table_name}(key Int32, value UUID) engine = PostgreSQL (postgres1, table={table_name})")
instance.query(f"insert into {table_name} select number, generateUUIDv4() from numbers(10)")
table_overrides = f" TABLE OVERRIDE {table_name} (COLUMNS (key Int32, value UUID))"
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_tables_list = '{table_name}'"], table_overrides=table_overrides)
assert_nested_table_is_created(table_name, materialized_database)
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_tables_list = '{table_name}'"],
table_overrides=table_overrides)
assert_nested_table_is_created(instance, table_name, materialized_database)
result = instance.query(f"show create table {materialized_database}.{table_name}")
print(result)
expected = "CREATE TABLE test_database.table_override\\n(\\n `key` Int32,\\n `value` UUID,\\n `_sign` Int8() MATERIALIZED 1,\\n `_version` UInt64() MATERIALIZED 1\\n)\\nENGINE = ReplacingMergeTree(_version)\\nORDER BY tuple(key)"
@ -621,29 +412,23 @@ def test_table_override(started_cluster):
time.sleep(5)
query = f"select * from {materialized_database}.{table_name} order by key"
expected = instance.query(f"select * from {table_name} order by key")
instance.query(f"drop table {table_name} no delay")
assert_eq_with_retry(instance, query, expected)
drop_materialized_db()
drop_postgres_table(cursor, table_name)
def test_table_schema_changes_2(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
table_name = "test_table"
create_postgres_table(cursor, table_name, template=postgres_table_template_2);
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number, number, number from numbers(25)")
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=["materialized_postgresql_allow_automatic_update = 1, materialized_postgresql_tables_list='test_table'"])
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number, number, number from numbers(25, 25)")
check_tables_are_synchronized(table_name);
check_tables_are_synchronized(instance, table_name);
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value1")
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value2")
@ -653,24 +438,24 @@ def test_table_schema_changes_2(started_cluster):
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value3 Text")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value4 Text")
cursor.execute(f"UPDATE {table_name} SET value3 = 'kek' WHERE key%2=0")
check_tables_are_synchronized(table_name);
check_tables_are_synchronized(instance, table_name);
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), toString(number) from numbers(50, 25)")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value5 Integer")
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value2")
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), number from numbers(75, 25)")
check_tables_are_synchronized(table_name);
check_tables_are_synchronized(instance, table_name);
instance.restart_clickhouse()
check_tables_are_synchronized(table_name);
check_tables_are_synchronized(instance, table_name);
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value5")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value5 Text")
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), toString(number) from numbers(100, 25)")
check_tables_are_synchronized(table_name);
check_tables_are_synchronized(instance, table_name);
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value6 Text")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value7 Integer")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value8 Integer")
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value5")
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), toString(number), number, number from numbers(125, 25)")
check_tables_are_synchronized(table_name);
check_tables_are_synchronized(instance, table_name);
if __name__ == '__main__':

View File

@ -21,5 +21,14 @@
<database>clickhouse</database>
<table>test_table</table>
</mysql3>
<mysql4>
<user>root</user>
<password>clickhouse</password>
<host>mysql57</host>
<port>3306</port>
<database>clickhouse</database>
<table>test_table</table>
<connection_pool_size>0</connection_pool_size>
</mysql4>
</named_collections>
</clickhouse>

View File

@ -418,6 +418,10 @@ def test_predefined_connection_configuration(started_cluster):
''')
assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100')
assert 'Connection pool cannot have zero size' in node1.query_and_get_error("SELECT count() FROM mysql(mysql1, table='test_table', connection_pool_size=0)")
assert 'Connection pool cannot have zero size' in node1.query_and_get_error("SELECT count() FROM mysql(mysql4)")
assert int(node1.query("SELECT count() FROM mysql(mysql4, connection_pool_size=1)")) == 100
# Regression for (k, v) IN ((k, v))
def test_mysql_in(started_cluster):

View File

@ -68,3 +68,26 @@ def test_system_logs_recreate():
# IOW that the table created only when the structure is indeed different.
for table in system_logs:
assert len(node.query(f"SHOW TABLES FROM system LIKE '{table}%'").strip().split('\n')) == 3
def test_drop_system_log():
node.exec_in_container(['bash', '-c', f"""echo "
<clickhouse>
<query_log>
<flush_interval_milliseconds replace=\\"replace\\">1000000</flush_interval_milliseconds>
</query_log>
</clickhouse>
" > /etc/clickhouse-server/config.d/yyy-override-query_log.xml
"""])
node.restart_clickhouse()
node.query("select 1")
node.query("system flush logs")
node.query("select 2")
node.query("system flush logs")
assert node.query("select count() > 0 from system.query_log") == "1\n"
node.query("drop table system.query_log sync")
node.query("select 3")
node.query("system flush logs")
assert node.query("select count() > 0 from system.query_log") == "1\n"
node.exec_in_container(['rm', f'/etc/clickhouse-server/config.d/yyy-override-query_log.xml'])
node.restart_clickhouse()

View File

@ -1,5 +1,5 @@
DROP TABLE IF EXISTS ttl;
CREATE TABLE ttl (d DateTime) ENGINE = MergeTree ORDER BY tuple() TTL d + INTERVAL 10 DAY;
CREATE TABLE ttl (d DateTime) ENGINE = MergeTree ORDER BY tuple() TTL d + INTERVAL 10 DAY SETTINGS remove_empty_parts=0;
SYSTEM STOP MERGES ttl;
INSERT INTO ttl VALUES ('2000-01-01 01:02:03'), ('2000-02-03 04:05:06');
SELECT rows, delete_ttl_info_min, delete_ttl_info_max, move_ttl_info.expression, move_ttl_info.min, move_ttl_info.max FROM system.parts WHERE database = currentDatabase() AND table = 'ttl';

View File

@ -0,0 +1 @@
1 ['a','b'] [3,4] ['','']

View File

@ -0,0 +1,8 @@
DROP TABLE IF EXISTS nested_test;
CREATE TABLE nested_test (x UInt32, `nest.col1` Array(String), `nest.col2` Array(Int8)) ENGINE = MergeTree ORDER BY x;
ALTER TABLE nested_test ADD COLUMN `nest.col3` Array(LowCardinality(String));
INSERT INTO nested_test (x, `nest.col1`, `nest.col2`) values (1, ['a', 'b'], [3, 4]);
SELECT * FROM nested_test;
DROP TABLE IF EXISTS nested_test;

View File

@ -1,13 +1,13 @@
SELECT count()
FROM t_02156_merge1
PREWHERE k = 1
WHERE (k = 1) AND notEmpty(v)
PREWHERE k = 3
WHERE (k = 3) AND notEmpty(v)
2
SELECT count()
FROM t_02156_merge2
WHERE (k = 1) AND notEmpty(v)
WHERE (k = 3) AND notEmpty(v)
2
SELECT count()
FROM t_02156_merge3
WHERE (k = 1) AND notEmpty(v)
WHERE (k = 3) AND notEmpty(v)
2

View File

@ -20,14 +20,14 @@ INSERT INTO t_02156_mt1 SELECT number, toString(number) FROM numbers(10000);
INSERT INTO t_02156_mt2 SELECT number, toString(number) FROM numbers(10000);
INSERT INTO t_02156_log SELECT number, toString(number) FROM numbers(10000);
EXPLAIN SYNTAX SELECT count() FROM t_02156_merge1 WHERE k = 1 AND notEmpty(v);
SELECT count() FROM t_02156_merge1 WHERE k = 1 AND notEmpty(v);
EXPLAIN SYNTAX SELECT count() FROM t_02156_merge1 WHERE k = 3 AND notEmpty(v);
SELECT count() FROM t_02156_merge1 WHERE k = 3 AND notEmpty(v);
EXPLAIN SYNTAX SELECT count() FROM t_02156_merge2 WHERE k = 1 AND notEmpty(v);
SELECT count() FROM t_02156_merge2 WHERE k = 1 AND notEmpty(v);
EXPLAIN SYNTAX SELECT count() FROM t_02156_merge2 WHERE k = 3 AND notEmpty(v);
SELECT count() FROM t_02156_merge2 WHERE k = 3 AND notEmpty(v);
EXPLAIN SYNTAX SELECT count() FROM t_02156_merge3 WHERE k = 1 AND notEmpty(v);
SELECT count() FROM t_02156_merge3 WHERE k = 1 AND notEmpty(v);
EXPLAIN SYNTAX SELECT count() FROM t_02156_merge3 WHERE k = 3 AND notEmpty(v);
SELECT count() FROM t_02156_merge3 WHERE k = 3 AND notEmpty(v);
DROP TABLE IF EXISTS t_02156_mt1;
DROP TABLE IF EXISTS t_02156_mt2;

View File

@ -0,0 +1,21 @@
#!/usr/bin/expect -f
log_user 0
set timeout 20
match_max 100000
expect_after {
eof { exp_continue }
timeout { exit 1 }
}
set basedir [file dirname $argv0]
spawn bash -c "source $basedir/../shell_config.sh ; \$CLICKHOUSE_LOCAL --disable_suggestion"
expect ":) "
send -- "insert into table function null() format TSV some trash here 123 \n 456\r"
expect -re ".*DB::Exception: Table function 'null' requires 'structure'.*\r"
expect ":) "
send -- ""
expect eof

View File

@ -0,0 +1,40 @@
1 one
2 tow
1 one
2 tow
1 one
2 tow
1 one
2 tow
1 one
2 tow
1 one
2 tow
1 one
2 tow
{
"meta":
[
{
"name": "id",
"type": "UInt64"
},
{
"name": "name",
"type": "String"
}
],
"data":
[
{
"id": "1",
"name": "one"
},
{
"id": "2",
"name": "tow"
}
],
"rows": 2,

View File

@ -0,0 +1,72 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
[ -e "${CLICKHOUSE_TMP}"/hello.csv ] && rm "${CLICKHOUSE_TMP}"/hello.csv
[ -e "${CLICKHOUSE_TMP}"/world.csv.gz ] && rm "${CLICKHOUSE_TMP}"/world.csv.gz
[ -e "${CLICKHOUSE_TMP}"/hello.world.csv ] && rm "${CLICKHOUSE_TMP}"/hello.world.csv
[ -e "${CLICKHOUSE_TMP}"/hello.world.csv.xz ] && rm "${CLICKHOUSE_TMP}"/hello.world.csv.xz
[ -e "${CLICKHOUSE_TMP}"/.htaccess.json ] && rm "${CLICKHOUSE_TMP}"/.htaccess.json
[ -e "${CLICKHOUSE_TMP}"/example.com. ] && rm "${CLICKHOUSE_TMP}"/example.com.
[ -e "${CLICKHOUSE_TMP}"/museum...protobuf ] && rm "${CLICKHOUSE_TMP}"/museum...protobuf
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS 02165_out_tb;"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE 02165_out_tb (id UInt64, name String) Engine=Memory;"
${CLICKHOUSE_CLIENT} --query "INSERT INTO 02165_out_tb Values(1, 'one'), (2, 'tow');"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE 02165_in_tb (id UInt64, name String) Engine=Memory;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_out_tb INTO OUTFILE '${CLICKHOUSE_TMP}/hello.csv';"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE 02165_in_tb FROM INFILE '${CLICKHOUSE_TMP}/hello.csv' FORMAT CSV;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_out_tb INTO OUTFILE '${CLICKHOUSE_TMP}/world.csv.gz';"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE 02165_in_tb FROM INFILE '${CLICKHOUSE_TMP}/world.csv.gz' COMPRESSION 'gz' FORMAT CSV;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_out_tb INTO OUTFILE '${CLICKHOUSE_TMP}/hello.world.csv';"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE 02165_in_tb FROM INFILE '${CLICKHOUSE_TMP}/hello.world.csv' FORMAT CSV;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_out_tb INTO OUTFILE '${CLICKHOUSE_TMP}/hello.world.csv.xz';"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE 02165_in_tb FROM INFILE '${CLICKHOUSE_TMP}/hello.world.csv.xz' COMPRESSION 'xz' FORMAT CSV;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_out_tb INTO OUTFILE '${CLICKHOUSE_TMP}/example.com.';"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE 02165_in_tb FROM INFILE '${CLICKHOUSE_TMP}/example.com.' FORMAT TabSeparated;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_out_tb INTO OUTFILE '${CLICKHOUSE_TMP}/museum...protobuf';"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE 02165_in_tb FROM INFILE '${CLICKHOUSE_TMP}/museum...protobuf' FORMAT TabSeparated;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE 02165_in_tb FROM INFILE '${CLICKHOUSE_TMP}/world.csv.gz';"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE 02165_in_tb;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM 02165_out_tb INTO OUTFILE '${CLICKHOUSE_TMP}/.htaccess.json';"
head -n 26 ${CLICKHOUSE_TMP}/.htaccess.json
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS 02165_out_tb;"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS 02165_in_tb;"
rm "${CLICKHOUSE_TMP}"/hello.csv
rm "${CLICKHOUSE_TMP}"/world.csv.gz
rm "${CLICKHOUSE_TMP}"/hello.world.csv
rm "${CLICKHOUSE_TMP}"/hello.world.csv.xz
rm "${CLICKHOUSE_TMP}"/.htaccess.json
rm "${CLICKHOUSE_TMP}"/example.com.
rm "${CLICKHOUSE_TMP}"/museum...protobuf

View File

@ -0,0 +1,5 @@
INSERT INTO test FROM INFILE data.file SELECT x
FROM input(\'x UInt32\')
INSERT INTO test FROM INFILE data.file WITH number AS x
SELECT number
FROM input(\'number UInt32\')

View File

@ -0,0 +1,4 @@
EXPLAIN SYNTAX INSERT INTO test FROM INFILE 'data.file' SELECT x from input('x UInt32') FORMAT TSV;
EXPLAIN SYNTAX INSERT INTO test FROM INFILE 'data.file' WATCH view; -- { clientError SYNTAX_ERROR }
EXPLAIN SYNTAX INSERT INTO test FROM INFILE 'data.file' VALUES (1) -- { clientError SYNTAX_ERROR }
EXPLAIN SYNTAX INSERT INTO test FROM INFILE 'data.file' WITH number AS x SELECT number FROM input('number UInt32');

View File

@ -0,0 +1 @@
x LowCardinality(UInt64)

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "insert into table function file('arrow.dict', 'Arrow', 'x LowCardinality(UInt64)') select number from numbers(10) settings output_format_arrow_low_cardinality_as_dictionary=1"
$CLICKHOUSE_CLIENT -q "desc file('arrow.dict', 'Arrow')"

View File

@ -0,0 +1,12 @@
5
6
7
8
9
10
0
1
2
3
4
5

View File

@ -0,0 +1,12 @@
DROP TABLE IF EXISTS counter;
CREATE TABLE counter (id UInt64, createdAt DateTime) ENGINE = MergeTree() ORDER BY id;
INSERT INTO counter SELECT number, now() FROM numbers(500);
DROP TABLE IF EXISTS vcounter;
CREATE VIEW vcounter AS SELECT intDiv(id, 10) AS tens, max(createdAt) AS maxid FROM counter GROUP BY tens;
SELECT tens FROM vcounter ORDER BY tens ASC LIMIT 100 SETTINGS limit = 6, offset = 5;
SELECT tens FROM vcounter ORDER BY tens ASC LIMIT 100 SETTINGS limit = 6, offset = 0;
DROP TABLE vcounter;
DROP TABLE counter;

View File

@ -0,0 +1,135 @@
---
title: 'Admixer Aggregates Over 1 Billion Unique Users a Day using ClickHouse'
image: 'https://blog-images.clickhouse.com/en/2022/admixer-case-study/featured.jpg'
date: '2022-01-11'
author: 'Vladimir Zakrevsky'
tags: ['company']
---
## Highlights
* Inserting around 100 billion records per day, over 1 million records per second
* Able to aggregate over 1 billion unique users a day
* Moved from MSSQL to Azure Table Storage to ClickHouse
* ClickHouse is deployed on 15 servers with 2 TB total RAM
Admixer is an Ad-Tech company that provides all the components to build infrastructure for advertising products for brands, ad agencies, media houses, publishers, ad networks, and other buy- and sell-side industry players looking for effective ad management. A distinctive feature of Admixer is their technology, which allows:
* Agencies to place advertising campaigns with specified execution conditions (terms, budget, creative display settings)
* Set the rules for distributing advertising campaign budgets among thousands of publishers
* Provide accounts for publishers, where they could not only see income statistics or withdraw money but also create their advertising campaigns, as well as connect other sources of monetization in addition to Network advertising campaigns.
Admixers products include:
* SSP - Supply-side platform where publishers/websites offer advertising space
* DSP - Demand-side platform where advertisers buy advertising space
* ADX - Ad exchange (connects SSPs and DSPs - buyers and sellers of advertisements and advertising space)
* DMP - Data management platform (used by advertisers to configure the audience they want to target)
Admixer provides not only access to these products but allows customers to build an entire ecosystem.
## Why We Chose ClickHouse
To implement the previous point, Admixer began developing an Advertising Exchange. Initially, AdExchange was based on the sale of local inventory by external DSPs. Then it began to aggregate the traffic of external SSPs to place local advertisements on it and later redirect this traffic to external DSPs. Thus, ADX was created.
In 2015-2016, the share of external inventory was 3% (100 million requests), then at the end of 2016, it was more than 90% (3 billion requests). With a sharp increase in requests, the load on their processing increased, and most importantly, the load on the storage and provision of online analytics increased. Relational databases could not handle that many inserts for statistics records. Before migrating to Azure, we used a MSSQL server which stored the object structure and statistics.
In 2011, when migrating to Azure, we used Azure Table Storage to store and issue statistics. But with an increase in the number of transactions and the amount of data, it was not optimal to use this solution since Azure Table Storage charges for the number of transactions and the amount of data.
Thus we needed to:
* Display statistics on advertising transactions in the user interface in real-time;
* Accept a significant amount (1 million records per second) of data for insertion;
* Aggregate the received data for different sections (40 operations and the same number of metrics);
* Be able to scale the data warehouse as the number of requests grew;
* Have full control over our costs.
![Profile Report](https://blog-images.clickhouse.com/en/2022/admixer-case-study/profile-report.png)
This image shows the Profile Report. Any Ad Campaign in Admixer is split by Line Items (Profiles). It is possible to overview detailed reports by each Profile including Date-Time Statistics, Geo, Domans, SSPs. This report is also updated in real time.
## The Advantages of Using ClickHouse
ClickHouse helps to cope with the challenges above and provides the following benefits:
* Not tied to the platform (we decided to migrate from the cloud);
* The cluster we built allows us to receive up to a million inserts per second (and we know how to scale up on demand);
* Has built-in mechanisms for aggregating and distributing data across tables (materialized views);
* Excellent data compression;
* Reading speed makes it possible to display statistics directly in the user interface in real-time;
* Has a SQL dialect that provides the ability to build any reports;
* Has several advanced functions (and allows you to write your own) for processing statistics;
* Built-in HyperLogLog for storing rough data;
* Data sampling;
* Open source / community / good documentation;
* Constant additions of new features, bug fixes, and improvements to the current functionality;
* Convenient operations.
## ClickHouse Architecture
Our architecture changed from 2016 to 2020. There are two diagrams below: the state we started and the state we came to.
![Architecture 2016](https://blog-images.clickhouse.com/en/2022/admixer-case-study/architecture-2016.png)
_Architecture 2016_
![Architecture 2020](https://blog-images.clickhouse.com/en/2022/admixer-case-study/architecture-2020.png)
_Architecture 2020_
Requests Handler is a component that accepts a request for an advertisement and determines which banner to display. After the banner is selected, it records this in the statistics. Since 2020, these components have been receiving over 1 million requests per second. Statistics were recorded through an intermediate element named Global Events Queue. Events were retrieved from GlobalEventsQueue, read by the EventsProcessor components, and additionally validated/enriched, then written to the ClickHouse cluster.
Initially, we wrote from EventsProcessor in ClickHouse into several tables in parallel but then switched through Buffer-> Null-table-> MatViews. We will next investigate if the new [asynchronous insert feature](https://clickhouse.com/blog/en/2021/clickhouse-v21.11-released/#async-inserts) in version 21.11 would be an alternative to using a buffer table.
We also reviewed the implementation of the event queue. Initially, we used Redis (but Redis is InMemory storage), thus:
* On server restart, there was a risk of losing events;
* The amount of RAM is relatively small, and if we planned to stop the Events Processor or ClickHouse, there was a risk of overflowing the event queue, so a very high response rate to event processor problems was required.
We tried to replace Redis and use Kafka instead, but the Kafka driver for ClickHouse at the time had issues with arrays (which has since been fixed).
Therefore, we implemented our event queue, which was stored on the disk of each EventHandler component, and the local EventsProcessor was located on the same server. The number of EventsProcessor components has increased, which means that the number of insert requests in ClickHouse has also increased, but this was not a problem.
Since financial optimization was also an essential factor for us, this scheme proved to be excellent in this regard as well. To receive processing and storage of data from ADX, we assembled a cluster with 15 servers (40 threads, 128 RAM, SSD storage), and we also took this with a margin. For the storage cluster for unique users, we used a cluster with 6 of the same servers.
An important point was also the work with receiving data from clusters. If you recklessly send a request to the cluster, this could create a pretty significant load on it, leading to the slowdown of other processes. But ClickHouse has settings for limiting resources and allocating quotas for specific users, which allowed us to solve this case quickly. All configuration files can be perfectly placed in the configuration management system and managed from there.
## ClickHouse Handles Over 1 Billion Unique Users Per Day
In addition to statistics aggregation, which summed up metrics by dimension, Admixer provides information on how many unique users have watched ads for an arbitrary time. The number of uniques cannot be summed up. In our system, the user ID is the UUID. When we want to get several unique UUIDs for some arbitrary period, we need to recalculate the unique UUIDs for this period each time. We cannot decompose all possible combinations in advance since the intersection will be too large.
Before using ClickHouse, we could count uniques only for predefined periods: day, week, month, all the time. Also, the number of slices was limited. Also, constant bulk requests for Aerospike slowed down the event processor.
AggregatingMergeTree allowed us with minimal costs to count unique users by a large number of keys in one report. In the beginning, with a cluster from three servers, we could easily count 1 billion uniques per day in ~ 12 slices. There are nuances; large slices cannot be output to the interface since simultaneous scanning of large tables will take a lot of CPU time. The solution to this problem was the report generation service, which has its internal queue and sends the already generated CSV files to the interface. On the other hand, we can output small slices to the interface with a limited date range.
ClickHouse was perfect as Big Data Storage for our ML models.
## Advice To Others Who Might Be Considering ClickHouse
The Devil is in the details!
ClickHouse technical tips:
* If you do not need high data accuracy, use HyperLogLog and sampling;
* Run load tests to determine the number of operations that your cluster can withstand given your data structure before assembling the cluster;
* Buffer is a great way to insert data, but watch out for memory;
* Use Native format for insert;
* Avoid large numbers of small parts for continuous flow insertion. Too many tables generate a lot of merges in the background such as the Too many parts (300) error;
* It is necessary to decide on the replication scheme at the beginning. One option is to use ZooKeeper and let tables replicate themselves using ReplicatedMergeTree and other replicating table engines. Because we had many tables and we wanted to choose what parts of the data to replicate to which servers we chose to not use ZooKeeper and have our client spread the writes - each write goes to two servers.
Over the past five years, the Admixer's Core team has been working with a high-load and aggregation of big data. Any work has its subtleties, do not step on your rake. Use ours.
We offer customers specialized audit, consultation, or create ready-made solutions using ClickHouse to solve high-load tasks. These speciality services are now offered via our new initiative [LoadFighters](https://loadfighters.com).
### About Admixer
Admixer is an independent adtech company that develops an ecosystem of full-stack programmatic solutions. Admixer has its own line of adtech products for brands, ad agencies, media houses, publishers, ad networks, and other buy- and sell-side industry players looking for effective ad management. Our customizable technology, in-depth expertise, and a personal approach help businesses turn programmatic advertising into a scalable revenue channel.
Since their start in 2008, weve been on a mission to build an ecosystem with effective and transparent relationships between all of the players in the digital advertising industry.
Today, the company has over 100 supply and demand partners, 3,000+ customers, and 200+ employees worldwide. They run offices in Ukraine, Belarus, Kazakhstan, Moldova, Georgia, and legal entities in the UK and Germany.
For more information please visit:
[https://admixer.com/](https://admixer.com/)

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Some files were not shown because too many files have changed in this diff Show More