Merge branch 'master' into more_compatible_read

This commit is contained in:
mergify[bot] 2022-03-04 18:19:01 +00:00 committed by GitHub
commit 13858e88d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
155 changed files with 1673 additions and 342 deletions

View File

@ -19,6 +19,12 @@
#if defined(__SSE4_2__)
#include <smmintrin.h>
#include <nmmintrin.h>
#define CRC_INT _mm_crc32_u64
#endif
#if defined(__aarch64__) && defined(__ARM_FEATURE_CRC32)
#include <arm_acle.h>
#define CRC_INT __crc32cd
#endif
@ -205,7 +211,7 @@ struct StringRefHash64
}
};
#if defined(__SSE4_2__)
#if defined(CRC_INT)
/// Parts are taken from CityHash.
@ -281,13 +287,13 @@ struct CRC32Hash
do
{
UInt64 word = unalignedLoad<UInt64>(pos);
res = _mm_crc32_u64(res, word);
res = CRC_INT(res, word);
pos += 8;
} while (pos + 8 < end);
UInt64 word = unalignedLoad<UInt64>(end - 8); /// I'm not sure if this is normal.
res = _mm_crc32_u64(res, word);
res = CRC_INT(res, word);
return res;
}

View File

@ -263,9 +263,20 @@ function run_tests
if [[ $NPROC == 0 ]]; then
NPROC=1
fi
time clickhouse-test --hung-check -j "${NPROC}" --order=random \
--fast-tests-only --no-long --testname --shard --zookeeper --check-zookeeper-session \
-- "$FASTTEST_FOCUS" 2>&1 \
local test_opts=(
--hung-check
--fast-tests-only
--no-long
--testname
--shard
--zookeeper
--check-zookeeper-session
--order random
--print-time
--jobs "${NPROC}"
)
time clickhouse-test "${test_opts[@]}" -- "$FASTTEST_FOCUS" 2>&1 \
| ts '%Y-%m-%d %H:%M:%S' \
| tee "$FASTTEST_OUTPUT/test_result.txt"
set -e

View File

@ -36,6 +36,7 @@ ENGINE = MaterializedMySQL('host:port', ['database' | database], 'user', 'passwo
- `max_flush_data_time` — Maximum number of milliseconds that data is allowed to cache in memory (for database and the cache data unable to query). When this time is exceeded, the data will be materialized. Default: `1000`.
- `max_wait_time_when_mysql_unavailable` — Retry interval when MySQL is not available (milliseconds). Negative value disables retry. Default: `1000`.
- `allows_query_when_mysql_lost` — Allows to query a materialized table when MySQL is lost. Default: `0` (`false`).
- `materialized_mysql_tables_list` — a comma-separated list of mysql database tables, which will be replicated by MaterializedMySQL database engine. Default value: empty list — means whole tables will be replicated.
```sql
CREATE DATABASE mysql ENGINE = MaterializedMySQL('localhost:3306', 'db', 'user', '***')

View File

@ -22,4 +22,4 @@ Here is the illustration of the difference between traditional row-oriented syst
**Columnar**
![Columnar](https://clickhouse.com/docs/en/images/column-oriented.gif#)
A columnar database is a preferred choice for analytical applications because it allows to have many columns in a table just in case, but do not pay the cost for unused columns on read query execution time. Column-oriented databases are designed for big data processing because and data warehousing, they often natively scale using distributed clusters of low-cost hardware to increase throughput. ClickHouse does it with combination of [distributed](../../engines/table-engines/special/distributed.md) and [replicated](../../engines/table-engines/mergetree-family/replication.md) tables.
A columnar database is a preferred choice for analytical applications because it allows to have many columns in a table just in case, but do not pay the cost for unused columns on read query execution time. Column-oriented databases are designed for big data processing and data warehousing, because they often natively scale using distributed clusters of low-cost hardware to increase throughput. ClickHouse does it with combination of [distributed](../../engines/table-engines/special/distributed.md) and [replicated](../../engines/table-engines/mergetree-family/replication.md) tables.

View File

@ -215,6 +215,6 @@ SELECT 1
**Congratulations, the system works!**
To continue experimenting, you can download one of the test data sets or go through [tutorial](https://clickhouse.com/tutorial.html).
To continue experimenting, you can download one of the test data sets or go through [tutorial](./tutorial.md).
[Original article](https://clickhouse.com/docs/en/getting_started/install/) <!--hide-->

View File

@ -186,6 +186,6 @@ SELECT 1
**おめでとうございます!システムが動きました!**
動作確認を続けるには、テストデータセットをダウンロードするか、[チュートリアル](https://clickhouse.com/tutorial.html)を参照してください。
動作確認を続けるには、テストデータセットをダウンロードするか、[チュートリアル](./tutorial.md)を参照してください。
[元の記事](https://clickhouse.com/docs/en/getting_started/install/) <!--hide-->

View File

@ -195,4 +195,4 @@ SELECT 1
**Поздравляем, система работает!**
Для дальнейших экспериментов можно попробовать загрузить один из тестовых наборов данных или пройти [пошаговое руководство для начинающих](https://clickhouse.com/tutorial.html).
Для дальнейших экспериментов можно попробовать загрузить один из тестовых наборов данных или пройти [пошаговое руководство для начинающих](./tutorial.md).

View File

@ -183,6 +183,6 @@ SELECT 1
**恭喜,系统已经工作了!**
为了继续进行实验,你可以尝试下载测试数据集或查看[教程](https://clickhouse.com/tutorial.html)。
为了继续进行实验,你可以尝试下载测试数据集或查看[教程](./tutorial.md)。
[原始文章](https://clickhouse.com/docs/en/getting_started/install/) <!--hide-->

View File

@ -371,6 +371,13 @@ void Client::initialize(Poco::Util::Application & self)
configReadClient(config(), home_path);
const char * env_user = getenv("CLICKHOUSE_USER");
const char * env_password = getenv("CLICKHOUSE_PASSWORD");
if (env_user)
config().setString("user", env_user);
if (env_password)
config().setString("password", env_password);
// global_context->setApplicationType(Context::ApplicationType::CLIENT);
global_context->setQueryParameters(query_parameters);
@ -1119,7 +1126,12 @@ void Client::processOptions(const OptionsDescription & options_description,
{
const auto & name = setting.getName();
if (options.count(name))
config().setString(name, options[name].as<String>());
{
if (allow_repeated_settings)
config().setString(name, options[name].as<Strings>().back());
else
config().setString(name, options[name].as<String>());
}
}
if (options.count("config-file") && options.count("config"))

View File

@ -1303,7 +1303,7 @@ if (ThreadFuzzer::instance().isEffective())
#endif
#if !defined(__x86_64__)
LOG_INFO(log, "Query Profiler is only tested on x86_64. It also known to not work under qemu-user.");
LOG_INFO(log, "Query Profiler and TraceCollector is only tested on x86_64. It also known to not work under qemu-user.");
#endif
if (!hasPHDRCache())

View File

@ -91,7 +91,7 @@ String serializeAccessEntity(const IAccessEntity & entity)
return buf.str();
}
AccessEntityPtr deserializeAccessEntity(const String & definition, const String & path)
AccessEntityPtr deserializeAccessEntityImpl(const String & definition)
{
ASTs queries;
ParserAttachAccessEntity parser;
@ -118,43 +118,42 @@ AccessEntityPtr deserializeAccessEntity(const String & definition, const String
if (auto * create_user_query = query->as<ASTCreateUserQuery>())
{
if (res)
throw Exception("Two access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
throw Exception("Two access entities attached in the same file", ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
res = user = std::make_unique<User>();
InterpreterCreateUserQuery::updateUserFromQuery(*user, *create_user_query);
}
else if (auto * create_role_query = query->as<ASTCreateRoleQuery>())
{
if (res)
throw Exception("Two access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
throw Exception("Two access entities attached in the same file", ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
res = role = std::make_unique<Role>();
InterpreterCreateRoleQuery::updateRoleFromQuery(*role, *create_role_query);
}
else if (auto * create_policy_query = query->as<ASTCreateRowPolicyQuery>())
{
if (res)
throw Exception("Two access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
throw Exception("Two access entities attached in the same file", ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
res = policy = std::make_unique<RowPolicy>();
InterpreterCreateRowPolicyQuery::updateRowPolicyFromQuery(*policy, *create_policy_query);
}
else if (auto * create_quota_query = query->as<ASTCreateQuotaQuery>())
{
if (res)
throw Exception("Two access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
throw Exception("Two access entities attached in the same file", ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
res = quota = std::make_unique<Quota>();
InterpreterCreateQuotaQuery::updateQuotaFromQuery(*quota, *create_quota_query);
}
else if (auto * create_profile_query = query->as<ASTCreateSettingsProfileQuery>())
{
if (res)
throw Exception("Two access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
throw Exception("Two access entities attached in the same file", ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
res = profile = std::make_unique<SettingsProfile>();
InterpreterCreateSettingsProfileQuery::updateSettingsProfileFromQuery(*profile, *create_profile_query);
}
else if (auto * grant_query = query->as<ASTGrantQuery>())
{
if (!user && !role)
throw Exception(
"A user or role should be attached before grant in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
throw Exception("A user or role should be attached before grant", ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
if (user)
InterpreterGrantQuery::updateUserFromQuery(*user, *grant_query);
else
@ -165,9 +164,27 @@ AccessEntityPtr deserializeAccessEntity(const String & definition, const String
}
if (!res)
throw Exception("No access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
throw Exception("No access entities attached", ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
return res;
}
AccessEntityPtr deserializeAccessEntity(const String & definition, const String & file_path)
{
if (file_path.empty())
return deserializeAccessEntityImpl(definition);
try
{
return deserializeAccessEntityImpl(definition);
}
catch (Exception & e)
{
e.addMessage("Could not parse " + file_path);
e.rethrow();
__builtin_unreachable();
}
}
}

View File

@ -10,6 +10,6 @@ using AccessEntityPtr = std::shared_ptr<const IAccessEntity>;
String serializeAccessEntity(const IAccessEntity & entity);
AccessEntityPtr deserializeAccessEntity(const String & definition, const String & path);
AccessEntityPtr deserializeAccessEntity(const String & definition, const String & file_path = "");
}

View File

@ -48,7 +48,7 @@ namespace
}
catch (...)
{
tryLogCurrentException(&log, "Could not parse " + file_path);
tryLogCurrentException(&log);
return nullptr;
}
}

View File

@ -1872,6 +1872,8 @@ void ClientBase::readArguments(
prev_port_arg = port_arg;
}
}
else if (arg == "--allow_repeated_settings"sv)
allow_repeated_settings = true;
else
common_arguments.emplace_back(arg);
}
@ -1884,7 +1886,10 @@ void ClientBase::readArguments(
void ClientBase::parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
{
cmd_settings.addProgramOptions(options_description.main_description.value());
if (allow_repeated_settings)
cmd_settings.addProgramOptionsAsMultitokens(options_description.main_description.value());
else
cmd_settings.addProgramOptions(options_description.main_description.value());
/// Parse main commandline options.
auto parser = po::command_line_parser(arguments).options(options_description.main_description.value()).allow_unregistered();
po::parsed_options parsed = parser.run();

View File

@ -260,6 +260,8 @@ protected:
std::vector<HostAndPort> hosts_and_ports{};
bool allow_repeated_settings = false;
bool cancelled = false;
};

View File

@ -36,8 +36,8 @@ public:
static Ptr create(const ColumnPtr & column) { return ColumnMap::create(column->assumeMutable()); }
static Ptr create(ColumnPtr && arg) { return create(arg); }
template <typename Arg, typename = typename std::enable_if<std::is_rvalue_reference<Arg &&>::value>::type>
static MutablePtr create(Arg && arg) { return Base::create(std::forward<Arg>(arg)); }
template <typename ... Args, typename = typename std::enable_if<IsMutableColumns<Args ...>::value>::type>
static MutablePtr create(Args &&... args) { return Base::create(std::forward<Args>(args)...); }
std::string getName() const override;
const char * getFamilyName() const override { return "Map"; }

View File

@ -142,7 +142,7 @@ void MySQLClient::setBinlogChecksum(const String & binlog_checksum)
replication.setChecksumSignatureLength(Poco::toUpper(binlog_checksum) == "NONE" ? 0 : 4);
}
void MySQLClient::startBinlogDumpGTID(UInt32 slave_id, String replicate_db, String gtid_str, const String & binlog_checksum)
void MySQLClient::startBinlogDumpGTID(UInt32 slave_id, String replicate_db, std::unordered_set<String> replicate_tables, String gtid_str, const String & binlog_checksum)
{
/// Maybe CRC32 or NONE. mysqlbinlog.cc use NONE, see its below comments:
/// Make a notice to the server that this client is checksum-aware.
@ -165,6 +165,7 @@ void MySQLClient::startBinlogDumpGTID(UInt32 slave_id, String replicate_db, Stri
/// Set Filter rule to replication.
replication.setReplicateDatabase(replicate_db);
replication.setReplicateTables(replicate_tables);
BinlogDumpGTID binlog_dump(slave_id, gtid_sets.toPayload());
packet_endpoint->sendPacket<BinlogDumpGTID>(binlog_dump, true);

View File

@ -33,7 +33,7 @@ public:
/// Start replication stream by GTID.
/// replicate_db: replication database schema, events from other databases will be ignored.
/// gtid: executed gtid sets format like 'hhhhhhhh-hhhh-hhhh-hhhh-hhhhhhhhhhhh:x-y'.
void startBinlogDumpGTID(UInt32 slave_id, String replicate_db, String gtid, const String & binlog_checksum);
void startBinlogDumpGTID(UInt32 slave_id, String replicate_db, std::unordered_set<String> replicate_tables, String gtid, const String & binlog_checksum);
BinlogEventPtr readOneBinlogEvent(UInt64 milliseconds = 0);
Position getPosition() const { return replication.getPosition(); }

View File

@ -142,8 +142,7 @@ namespace MySQLReplication
out << "XID: " << this->xid << '\n';
}
/// https://dev.mysql.com/doc/internals/en/table-map-event.html
void TableMapEvent::parseImpl(ReadBuffer & payload)
void TableMapEventHeader::parse(ReadBuffer & payload)
{
payload.readStrict(reinterpret_cast<char *>(&table_id), 6);
payload.readStrict(reinterpret_cast<char *>(&flags), 2);
@ -157,7 +156,11 @@ namespace MySQLReplication
table.resize(table_len);
payload.readStrict(reinterpret_cast<char *>(table.data()), table_len);
payload.ignore(1);
}
/// https://dev.mysql.com/doc/internals/en/table-map-event.html
void TableMapEvent::parseImpl(ReadBuffer & payload)
{
column_count = readLengthEncodedNumber(payload);
for (auto i = 0U; i < column_count; ++i)
{
@ -165,7 +168,6 @@ namespace MySQLReplication
payload.readStrict(reinterpret_cast<char *>(&v), 1);
column_type.emplace_back(v);
}
String meta;
readLengthEncodedString(meta, payload);
parseMeta(meta);
@ -957,10 +959,20 @@ namespace MySQLReplication
}
case TABLE_MAP_EVENT:
{
event = std::make_shared<TableMapEvent>(std::move(event_header));
event->parseEvent(event_payload);
auto table_map = std::static_pointer_cast<TableMapEvent>(event);
table_maps[table_map->table_id] = table_map;
TableMapEventHeader map_event_header;
map_event_header.parse(event_payload);
if (doReplicate(map_event_header.schema, map_event_header.table))
{
event = std::make_shared<TableMapEvent>(std::move(event_header), map_event_header);
event->parseEvent(event_payload);
auto table_map = std::static_pointer_cast<TableMapEvent>(event);
table_maps[table_map->table_id] = table_map;
}
else
{
event = std::make_shared<DryRunEvent>(std::move(event_header));
event->parseEvent(event_payload);
}
break;
}
case WRITE_ROWS_EVENT_V1:
@ -1030,8 +1042,21 @@ namespace MySQLReplication
// Special "dummy event"
return false;
}
auto table_map = table_maps.at(table_id);
return table_map->schema == replicate_do_db;
if (table_maps.contains(table_id))
{
auto table_map = table_maps.at(table_id);
return (table_map->schema == replicate_do_db) && (replicate_tables.empty() || replicate_tables.contains(table_map->table));
}
return false;
}
bool MySQLFlavor::doReplicate(const String & db, const String & table_name)
{
if (replicate_do_db.empty())
return false;
if (replicate_do_db != db)
return false;
return replicate_tables.empty() || table_name.empty() || replicate_tables.contains(table_name);
}
}

View File

@ -409,6 +409,20 @@ namespace MySQLReplication
void parseImpl(ReadBuffer & payload) override;
};
class TableMapEventHeader
{
public:
UInt64 table_id;
UInt16 flags;
UInt8 schema_len;
String schema;
UInt8 table_len;
String table;
TableMapEventHeader(): table_id(0), flags(0), schema_len(0), table_len(0) {}
void parse(ReadBuffer & payload);
};
class TableMapEvent : public EventBase
{
public:
@ -423,7 +437,15 @@ namespace MySQLReplication
std::vector<UInt16> column_meta;
Bitmap null_bitmap;
TableMapEvent(EventHeader && header_) : EventBase(std::move(header_)), table_id(0), flags(0), schema_len(0), table_len(0), column_count(0) {}
TableMapEvent(EventHeader && header_, const TableMapEventHeader & map_event_header) : EventBase(std::move(header_)), column_count(0)
{
table_id = map_event_header.table_id;
flags = map_event_header.flags;
schema_len = map_event_header.schema_len;
schema = map_event_header.schema;
table_len = map_event_header.table_len;
table = map_event_header.table;
}
void dump(WriteBuffer & out) const override;
protected:
@ -563,6 +585,7 @@ namespace MySQLReplication
Position getPosition() const override { return position; }
BinlogEventPtr readOneEvent() override { return event; }
void setReplicateDatabase(String db) override { replicate_do_db = std::move(db); }
void setReplicateTables(std::unordered_set<String> tables) { replicate_tables = std::move(tables); }
void setGTIDSets(GTIDSets sets) override { position.gtid_sets = std::move(sets); }
void setChecksumSignatureLength(size_t checksum_signature_length_) override { checksum_signature_length = checksum_signature_length_; }
@ -570,10 +593,13 @@ namespace MySQLReplication
Position position;
BinlogEventPtr event;
String replicate_do_db;
// only for filter data(Row Event), not include DDL Event
std::unordered_set<String> replicate_tables;
std::map<UInt64, std::shared_ptr<TableMapEvent> > table_maps;
size_t checksum_signature_length = 4;
bool doReplicate(UInt64 table_id);
bool doReplicate(const String & db, const String & table_name);
};
}

View File

@ -89,6 +89,14 @@ void Settings::addProgramOptions(boost::program_options::options_description & o
}
}
void Settings::addProgramOptionsAsMultitokens(boost::program_options::options_description & options)
{
for (const auto & field : all())
{
addProgramOptionAsMultitoken(options, field);
}
}
void Settings::addProgramOption(boost::program_options::options_description & options, const SettingFieldRef & field)
{
const std::string_view name = field.getName();
@ -97,6 +105,14 @@ void Settings::addProgramOption(boost::program_options::options_description & op
name.data(), boost::program_options::value<std::string>()->composing()->notifier(on_program_option), field.getDescription())));
}
void Settings::addProgramOptionAsMultitoken(boost::program_options::options_description & options, const SettingFieldRef & field)
{
const std::string_view name = field.getName();
auto on_program_option = boost::function1<void, const Strings &>([this, name](const Strings & values) { set(name, values.back()); });
options.add(boost::shared_ptr<boost::program_options::option_description>(new boost::program_options::option_description(
name.data(), boost::program_options::value<Strings>()->multitoken()->composing()->notifier(on_program_option), field.getDescription())));
}
void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path)
{
if (config.getBool("skip_check_for_incorrect_settings", false))

View File

@ -537,7 +537,7 @@ class IColumn;
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
M(UInt64, merge_tree_min_rows_for_concurrent_read_for_remote_filesystem, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, remote_read_min_bytes_for_seek, DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead for read with ignore.", 0) \
M(UInt64, remote_read_min_bytes_for_seek, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead for read with ignore.", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
@ -722,6 +722,11 @@ struct Settings : public BaseSettings<SettingsTraits>, public IHints<2, Settings
/// (Don't forget to call notify() on the `variables_map` after parsing it!)
void addProgramOptions(boost::program_options::options_description & options);
/// Adds program options as to set the settings from a command line.
/// Allows to set one setting multiple times, the last value will be used.
/// (Don't forget to call notify() on the `variables_map` after parsing it!)
void addProgramOptionsAsMultitokens(boost::program_options::options_description & options);
/// Check that there is no user-level settings at the top level in config.
/// This is a common source of mistake (user don't know where to write user-level setting).
static void checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path);
@ -729,6 +734,8 @@ struct Settings : public BaseSettings<SettingsTraits>, public IHints<2, Settings
std::vector<String> getAllRegisteredNames() const override;
void addProgramOption(boost::program_options::options_description & options, const SettingFieldRef & field);
void addProgramOptionAsMultitoken(boost::program_options::options_description & options, const SettingFieldRef & field);
};
/*

View File

@ -30,11 +30,15 @@ namespace ErrorCodes
static std::unordered_map<String, String> fetchTablesCreateQuery(
const mysqlxx::PoolWithFailover::Entry & connection, const String & database_name,
const std::vector<String> & fetch_tables, const Settings & global_settings)
const std::vector<String> & fetch_tables, std::unordered_set<String> & materialized_tables_list,
const Settings & global_settings)
{
std::unordered_map<String, String> tables_create_query;
for (const auto & fetch_table_name : fetch_tables)
{
if (!materialized_tables_list.empty() && !materialized_tables_list.contains(fetch_table_name))
continue;
Block show_create_table_header{
{std::make_shared<DataTypeString>(), "Table"},
{std::make_shared<DataTypeString>(), "Create Table"},
@ -276,7 +280,8 @@ MaterializeMetadata::MaterializeMetadata(const String & path_, const Settings &
void MaterializeMetadata::startReplication(
mysqlxx::PoolWithFailover::Entry & connection, const String & database,
bool & opened_transaction, std::unordered_map<String, String> & need_dumping_tables)
bool & opened_transaction, std::unordered_map<String, String> & need_dumping_tables,
std::unordered_set<String> & materialized_tables_list)
{
checkSyncUserPriv(connection, settings);
@ -297,7 +302,7 @@ void MaterializeMetadata::startReplication(
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
opened_transaction = true;
need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database, settings), settings);
need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database, settings), materialized_tables_list, settings);
connection->query("UNLOCK TABLES;").execute();
}
catch (...)

View File

@ -48,7 +48,8 @@ struct MaterializeMetadata
mysqlxx::PoolWithFailover::Entry & connection,
const String & database,
bool & opened_transaction,
std::unordered_map<String, String> & need_dumping_tables);
std::unordered_map<String, String> & need_dumping_tables,
std::unordered_set<String> & materialized_tables_list);
MaterializeMetadata(const String & path_, const Settings & settings_);
};

View File

@ -16,6 +16,7 @@ class ASTStorage;
M(UInt64, max_flush_data_time, 1000, "Max milliseconds that data is allowed to cache in memory(for database and the cache data unable to query). when this time is exceeded, the data will be materialized", 0) \
M(Int64, max_wait_time_when_mysql_unavailable, 1000, "Retry interval when MySQL is not available (milliseconds). Negative value disable retry.", 0) \
M(Bool, allows_query_when_mysql_lost, false, "Allow query materialized table when mysql is lost.", 0) \
M(String, materialized_mysql_tables_list, "", "a comma-separated list of mysql database tables, which will be replicated by MaterializedMySQL database engine. Default value: empty list — means whole tables will be replicated.", 0) \
DECLARE_SETTINGS_TRAITS(MaterializedMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS)

View File

@ -25,6 +25,10 @@
#include <Common/setThreadName.h>
#include <base/sleep.h>
#include <base/bit_cast.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Parsers/CommonParsers.h>
#include <Parsers/ASTIdentifier.h>
namespace DB
{
@ -148,6 +152,61 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection, const S
}
}
static std::tuple<String, String> tryExtractTableNameFromDDL(const String & ddl)
{
String table_name;
String database_name;
if (ddl.empty()) return std::make_tuple(database_name, table_name);
bool parse_failed = false;
Tokens tokens(ddl.data(), ddl.data() + ddl.size());
IParser::Pos pos(tokens, 0);
Expected expected;
ASTPtr res;
ASTPtr table;
if (ParserKeyword("CREATE TEMPORARY TABLE").ignore(pos, expected) || ParserKeyword("CREATE TABLE").ignore(pos, expected))
{
ParserKeyword("IF NOT EXISTS").ignore(pos, expected);
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("ALTER TABLE").ignore(pos, expected))
{
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("DROP TABLE").ignore(pos, expected) || ParserKeyword("DROP TEMPORARY TABLE").ignore(pos, expected))
{
ParserKeyword("IF EXISTS").ignore(pos, expected);
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("TRUNCATE").ignore(pos, expected))
{
ParserKeyword("TABLE").ignore(pos, expected);
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("RENAME TABLE").ignore(pos, expected))
{
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else
{
parse_failed = true;
}
if (!parse_failed)
{
if (auto table_id = table->as<ASTTableIdentifier>()->getTableId())
{
database_name = table_id.database_name;
table_name = table_id.table_name;
}
}
return std::make_tuple(database_name, table_name);
}
MaterializedMySQLSyncThread::MaterializedMySQLSyncThread(
ContextPtr context_,
const String & database_name_,
@ -164,6 +223,17 @@ MaterializedMySQLSyncThread::MaterializedMySQLSyncThread(
, settings(settings_)
{
query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") ";
if (!settings->materialized_mysql_tables_list.value.empty())
{
Names tables_list;
boost::split(tables_list, settings->materialized_mysql_tables_list.value, [](char c){ return c == ','; });
for (String & table_name: tables_list)
{
boost::trim(table_name);
materialized_tables_list.insert(table_name);
}
}
}
void MaterializedMySQLSyncThread::synchronization()
@ -434,7 +504,7 @@ bool MaterializedMySQLSyncThread::prepareSynchronized(MaterializeMetadata & meta
checkMySQLVariables(connection, getContext()->getSettingsRef());
std::unordered_map<String, String> need_dumping_tables;
metadata.startReplication(connection, mysql_database_name, opened_transaction, need_dumping_tables);
metadata.startReplication(connection, mysql_database_name, opened_transaction, need_dumping_tables, materialized_tables_list);
if (!need_dumping_tables.empty())
{
@ -464,7 +534,7 @@ bool MaterializedMySQLSyncThread::prepareSynchronized(MaterializeMetadata & meta
connection->query("COMMIT").execute();
client.connect();
client.startBinlogDumpGTID(randomNumber(), mysql_database_name, metadata.executed_gtid_set, metadata.binlog_checksum);
client.startBinlogDumpGTID(randomNumber(), mysql_database_name, materialized_tables_list, metadata.executed_gtid_set, metadata.binlog_checksum);
setSynchronizationThreadException(nullptr);
return true;
@ -792,9 +862,24 @@ void MaterializedMySQLSyncThread::executeDDLAtomic(const QueryEvent & query_even
auto query_context = createQueryContext(getContext());
CurrentThread::QueryScope query_scope(query_context);
String query = query_event.query;
if (!materialized_tables_list.empty())
{
auto [ddl_database_name, ddl_table_name] = tryExtractTableNameFromDDL(query_event.query);
if (!ddl_table_name.empty())
{
ddl_database_name = ddl_database_name.empty() ? query_event.schema: ddl_database_name;
if (ddl_database_name != mysql_database_name || !materialized_tables_list.contains(ddl_table_name))
{
LOG_DEBUG(log, "Skip MySQL DDL: \n {}", query_event.query);
return;
}
}
}
String comment = "Materialize MySQL step 2: execute MySQL DDL for sync data";
String event_database = query_event.schema == mysql_database_name ? database_name : "";
tryToExecuteQuery(query_prefix + query_event.query, query_context, event_database, comment);
tryToExecuteQuery(query_prefix + query, query_context, event_database, comment);
}
catch (Exception & exception)
{

View File

@ -63,6 +63,7 @@ private:
mutable MySQLClient client;
MaterializedMySQLSettings * settings;
String query_prefix;
NameSet materialized_tables_list;
// USE MySQL ERROR CODE:
// https://dev.mysql.com/doc/mysql-errors/5.7/en/server-error-reference.html

View File

@ -144,6 +144,12 @@ DiskCacheWrapper::readFile(
}
}
/// Do not use RemoteFSReadMethod::threadpool for index and mark files.
/// Here it does not make sense since the files are small.
/// Note: enabling `threadpool` read requires to call setReadUntilEnd().
auto current_read_settings = settings;
current_read_settings.remote_fs_method = RemoteFSReadMethod::read;
if (metadata->status == DOWNLOADING)
{
FileDownloadStatus result_status = DOWNLOADED;
@ -158,7 +164,7 @@ DiskCacheWrapper::readFile(
auto tmp_path = path + ".tmp";
{
auto src_buffer = DiskDecorator::readFile(path, settings, read_hint, file_size);
auto src_buffer = DiskDecorator::readFile(path, current_read_settings, read_hint, file_size);
auto dst_buffer = cache_disk->writeFile(tmp_path, settings.local_fs_buffer_size, WriteMode::Rewrite);
copyData(*src_buffer, *dst_buffer);
}
@ -184,7 +190,7 @@ DiskCacheWrapper::readFile(
if (metadata->status == DOWNLOADED)
return cache_disk->readFile(path, settings, read_hint, file_size);
return DiskDecorator::readFile(path, settings, read_hint, file_size);
return DiskDecorator::readFile(path, current_read_settings, read_hint, file_size);
}
std::unique_ptr<WriteBufferFromFileBase>

View File

@ -1,21 +1,2 @@
if (TARGET ch_contrib::avrocpp)
set(USE_AVRO 1)
endif()
if (TARGET ch_contrib::parquet)
set(USE_PARQUET 1)
set(USE_ARROW 1)
set(USE_ORC 1)
endif()
if (TARGET ch_contrib::snappy)
set(USE_SNAPPY 1)
endif()
if (TARGET ch_contrib::protobuf)
set(USE_PROTOBUF 1)
endif()
if (TARGET ch_contrib::msgpack)
set(USE_MSGPACK 1)
endif()
if (TARGET ch_contrib::capnp)
set(USE_CAPNP 1)
endif()
include(configure_config.cmake)
configure_file(config_formats.h.in ${ConfigIncludePath}/config_formats.h)

View File

@ -0,0 +1,20 @@
if (TARGET ch_contrib::avrocpp)
set(USE_AVRO 1)
endif()
if (TARGET ch_contrib::parquet)
set(USE_PARQUET 1)
set(USE_ARROW 1)
set(USE_ORC 1)
endif()
if (TARGET ch_contrib::snappy)
set(USE_SNAPPY 1)
endif()
if (TARGET ch_contrib::protobuf)
set(USE_PROTOBUF 1)
endif()
if (TARGET ch_contrib::msgpack)
set(USE_MSGPACK 1)
endif()
if (TARGET ch_contrib::capnp)
set(USE_CAPNP 1)
endif()

View File

@ -90,6 +90,22 @@ public:
return getDictionary(dict_name_col->getValue<String>());
}
static const DictionaryAttribute & getDictionaryHierarchicalAttribute(const std::shared_ptr<const IDictionary> & dictionary)
{
const auto & dictionary_structure = dictionary->getStructure();
auto hierarchical_attribute_index_optional = dictionary_structure.hierarchical_attribute_index;
if (!dictionary->hasHierarchy() || !hierarchical_attribute_index_optional.has_value())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Dictionary {} does not support hierarchy",
dictionary->getFullName());
size_t hierarchical_attribute_index = *hierarchical_attribute_index_optional;
const auto & hierarchical_attribute = dictionary_structure.attributes[hierarchical_attribute_index];
return hierarchical_attribute;
}
bool isDictGetFunctionInjective(const Block & sample_columns)
{
/// Assume non-injective by default
@ -939,39 +955,38 @@ private:
bool useDefaultImplementationForConstants() const final { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const final { return {0}; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type of first argument of function {}. Expected String. Actual type {}",
getName(),
arguments[0]->getName());
if (!WhichDataType(arguments[1]).isUInt64())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type of second argument of function {}. Expected UInt64. Actual type {}",
getName(),
arguments[1]->getName());
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
}
bool isDeterministic() const override { return false; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
String dictionary_name;
if (const auto * name_col = checkAndGetColumnConst<ColumnString>(arguments[0].column.get()))
dictionary_name = name_col->getValue<String>();
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of first argument of function {}, expected a const string.",
arguments[0].type->getName(),
getName());
auto dictionary = helper.getDictionary(arguments[0].column);
const auto & hierarchical_attribute = helper.getDictionaryHierarchicalAttribute(dictionary);
return std::make_shared<DataTypeArray>(hierarchical_attribute.type);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
if (input_rows_count == 0)
return result_type->createColumn();
auto dictionary = helper.getDictionary(arguments[0].column);
const auto & hierarchical_attribute = helper.getDictionaryHierarchicalAttribute(dictionary);
if (!dictionary->hasHierarchy())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Dictionary {} does not support hierarchy",
dictionary->getFullName());
auto key_column = ColumnWithTypeAndName{arguments[1].column, arguments[1].type, arguments[1].name};
auto key_column_casted = castColumnAccurate(key_column, hierarchical_attribute.type);
ColumnPtr result = dictionary->getHierarchy(key_column_casted, hierarchical_attribute.type);
ColumnPtr result = dictionary->getHierarchy(arguments[1].column, std::make_shared<DataTypeUInt64>());
return result;
}
@ -1009,18 +1024,6 @@ private:
getName(),
arguments[0]->getName());
if (!WhichDataType(arguments[1]).isUInt64())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type of second argument of function {}. Expected UInt64. Actual type {}",
getName(),
arguments[1]->getName());
if (!WhichDataType(arguments[2]).isUInt64())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type of third argument of function {}. Expected UInt64. Actual type {}",
getName(),
arguments[2]->getName());
return std::make_shared<DataTypeUInt8>();
}
@ -1031,16 +1034,18 @@ private:
if (input_rows_count == 0)
return result_type->createColumn();
auto dict = helper.getDictionary(arguments[0].column);
auto dictionary = helper.getDictionary(arguments[0].column);
const auto & hierarchical_attribute = helper.getDictionaryHierarchicalAttribute(dictionary);
if (!dict->hasHierarchy())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Dictionary {} does not support hierarchy",
dict->getFullName());
auto key_column = ColumnWithTypeAndName{arguments[1].column->convertToFullColumnIfConst(), arguments[1].type, arguments[2].name};
auto in_key_column = ColumnWithTypeAndName{arguments[2].column->convertToFullColumnIfConst(), arguments[2].type, arguments[2].name};
ColumnPtr res = dict->isInHierarchy(arguments[1].column, arguments[2].column, std::make_shared<DataTypeUInt64>());
auto key_column_casted = castColumnAccurate(key_column, hierarchical_attribute.type);
auto in_key_column_casted = castColumnAccurate(in_key_column, hierarchical_attribute.type);
return res;
ColumnPtr result = dictionary->isInHierarchy(key_column_casted, in_key_column_casted, hierarchical_attribute.type);
return result;
}
mutable FunctionDictHelper helper;
@ -1069,21 +1074,18 @@ private:
bool isDeterministic() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (!isString(arguments[0]))
if (!isString(arguments[0].type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type of first argument of function {}. Expected String. Actual type {}",
getName(),
arguments[0]->getName());
arguments[0].type->getName());
if (!WhichDataType(arguments[1]).isUInt64())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type of second argument of function {}. Expected UInt64. Actual type {}",
getName(),
arguments[1]->getName());
auto dictionary = helper.getDictionary(arguments[0].column);
const auto & hierarchical_attribute = helper.getDictionaryHierarchicalAttribute(dictionary);
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
return std::make_shared<DataTypeArray>(hierarchical_attribute.type);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
@ -1092,13 +1094,12 @@ private:
return result_type->createColumn();
auto dictionary = helper.getDictionary(arguments[0].column);
const auto & hierarchical_attribute = helper.getDictionaryHierarchicalAttribute(dictionary);
if (!dictionary->hasHierarchy())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Dictionary {} does not support hierarchy",
dictionary->getFullName());
auto key_column = ColumnWithTypeAndName{arguments[1].column->convertToFullColumnIfConst(), arguments[1].type, arguments[1].name};
auto key_column_casted = castColumnAccurate(key_column, hierarchical_attribute.type);
ColumnPtr result = dictionary->getDescendants(arguments[1].column, std::make_shared<DataTypeUInt64>(), 1);
ColumnPtr result = dictionary->getDescendants(key_column_casted, hierarchical_attribute.type, 1);
return result;
}
@ -1126,12 +1127,11 @@ private:
bool isVariadic() const override { return true; }
bool useDefaultImplementationForConstants() const final { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const final { return {0}; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const final { return {0, 2}; }
bool isDeterministic() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
size_t arguments_size = arguments.size();
if (arguments_size < 2 || arguments_size > 3)
@ -1142,27 +1142,24 @@ private:
arguments_size);
}
if (!isString(arguments[0]))
if (!isString(arguments[0].type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type of first argument of function {}. Expected const String. Actual type {}",
getName(),
arguments[0]->getName());
arguments[0].type->getName());
if (!WhichDataType(arguments[1]).isUInt64())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type of second argument of function {}. Expected UInt64. Actual type {}",
getName(),
arguments[1]->getName());
if (arguments.size() == 3 && !isUnsignedInteger(arguments[2]))
if (arguments.size() == 3 && !isInteger(arguments[2].type))
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type of third argument of function {}. Expected const unsigned integer. Actual type {}",
getName(),
arguments[2]->getName());
arguments[2].type->getName());
}
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
auto dictionary = helper.getDictionary(arguments[0].column);
const auto & hierarchical_attribute = helper.getDictionaryHierarchicalAttribute(dictionary);
return std::make_shared<DataTypeArray>(hierarchical_attribute.type);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
@ -1171,6 +1168,7 @@ private:
return result_type->createColumn();
auto dictionary = helper.getDictionary(arguments[0].column);
const auto & hierarchical_attribute = helper.getDictionaryHierarchicalAttribute(dictionary);
size_t level = 0;
@ -1181,17 +1179,21 @@ private:
"Illegal type of third argument of function {}. Expected const unsigned integer.",
getName());
level = static_cast<size_t>(arguments[2].column->get64(0));
auto value = static_cast<Int64>(arguments[2].column->getInt(0));
if (value < 0)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type of third argument of function {}. Expected const unsigned integer.",
getName());
level = static_cast<size_t>(value);
}
if (!dictionary->hasHierarchy())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Dictionary {} does not support hierarchy",
dictionary->getFullName());
auto key_column = ColumnWithTypeAndName{arguments[1].column->convertToFullColumnIfConst(), arguments[1].type, arguments[1].name};
auto key_column_casted = castColumnAccurate(key_column, hierarchical_attribute.type);
ColumnPtr res = dictionary->getDescendants(arguments[1].column, std::make_shared<DataTypeUInt64>(), level);
ColumnPtr result = dictionary->getDescendants(key_column_casted, hierarchical_attribute.type, level);
return res;
return result;
}
mutable FunctionDictHelper helper;

View File

@ -1,18 +1,29 @@
#pragma once
#include <type_traits>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFunction.h>
#include <Columns/ColumnMap.h>
#include <Columns/IColumn.h>
#include <Common/Exception.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFunction.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFunction.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Functions/IFunction.h>
#include <DataTypes/DataTypeMap.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context_fwd.h>
#include <IO/WriteHelpers.h>
namespace DB
{
@ -21,11 +32,38 @@ namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int LOGICAL_ERROR;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename T>
ColumnPtr getOffsetsPtr(const T & column)
{
if constexpr (std::is_same_v<T, ColumnArray>)
{
return column.getOffsetsPtr();
}
else // ColumnMap
{
return column.getNestedColumn().getOffsetsPtr();
}
}
template <typename T>
const IColumn::Offsets & getOffsets(const T & column)
{
if constexpr (std::is_same_v<T, ColumnArray>)
{
return column.getOffsets();
}
else // ColumnMap
{
return column.getNestedColumn().getOffsets();
}
}
/** Higher-order functions for arrays.
* These functions optionally apply a map (transform) to array (or multiple arrays of identical size) by lambda function,
* and return some result based on that transformation.
@ -60,29 +98,42 @@ public:
void getLambdaArgumentTypes(DataTypes & arguments) const override
{
if (arguments.empty())
throw Exception("Function " + getName() + " needs at least one argument; passed "
+ toString(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} needs at least one argument, passed {}", getName(), arguments.size());
if (arguments.size() == 1)
throw Exception("Function " + getName() + " needs at least one array argument.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} needs at least one argument with data", getName());
DataTypes nested_types(arguments.size() - 1);
for (size_t i = 0; i < nested_types.size(); ++i)
if (arguments.size() > 2 && Impl::needOneArray())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} needs one argument with data", getName());
size_t nested_types_count = std::is_same_v<typename Impl::data_type, DataTypeMap> ? (arguments.size() - 1) * 2 : (arguments.size() - 1);
DataTypes nested_types(nested_types_count);
for (size_t i = 0; i < arguments.size() - 1; ++i)
{
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(&*arguments[i + 1]);
const auto * array_type = checkAndGetDataType<typename Impl::data_type>(&*arguments[i + 1]);
if (!array_type)
throw Exception("Argument " + toString(i + 2) + " of function " + getName() + " must be array. Found "
+ arguments[i + 1]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
nested_types[i] = recursiveRemoveLowCardinality(array_type->getNestedType());
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeMap>)
{
nested_types[2 * i] = recursiveRemoveLowCardinality(array_type->getKeyType());
nested_types[2 * i + 1] = recursiveRemoveLowCardinality(array_type->getValueType());
}
else if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>)
{
nested_types[i] = recursiveRemoveLowCardinality(array_type->getNestedType());
}
}
const DataTypeFunction * function_type = checkAndGetDataType<DataTypeFunction>(arguments[0].get());
if (!function_type || function_type->getArgumentTypes().size() != nested_types.size())
throw Exception("First argument for this overload of " + getName() + " must be a function with "
+ toString(nested_types.size()) + " arguments. Found "
+ arguments[0]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"First argument for this overload of {} must be a function with {} arguments, found {} instead",
getName(), nested_types.size(), arguments[0]->getName());
arguments[0] = std::make_shared<DataTypeFunction>(nested_types);
}
@ -91,37 +142,39 @@ public:
{
size_t min_args = Impl::needExpression() ? 2 : 1;
if (arguments.size() < min_args)
throw Exception("Function " + getName() + " needs at least "
+ toString(min_args) + " argument; passed "
+ toString(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} needs at least {} argument, passed {}",
getName(), min_args, arguments.size());
if (arguments.size() == 1)
if ((arguments.size() == 1) && std::is_same_v<typename Impl::data_type, DataTypeArray>)
{
const auto * array_type = checkAndGetDataType<DataTypeArray>(arguments[0].type.get());
const auto * data_type = checkAndGetDataType<typename Impl::data_type>(arguments[0].type.get());
if (!array_type)
if (!data_type)
throw Exception("The only argument for function " + getName() + " must be array. Found "
+ arguments[0].type->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
+ arguments[0].type->getName() + " instead", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
DataTypePtr nested_type = array_type->getNestedType();
DataTypePtr nested_type = data_type->getNestedType();
if (Impl::needBoolean() && !WhichDataType(nested_type).isUInt8())
throw Exception("The only argument for function " + getName() + " must be array of UInt8. Found "
+ arguments[0].type->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
+ arguments[0].type->getName() + " instead", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return Impl::getReturnType(nested_type, nested_type);
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>)
return Impl::getReturnType(nested_type, nested_type);
else
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
}
else
{
if (arguments.size() > 2 && Impl::needOneArray())
throw Exception("Function " + getName() + " needs one array argument.",
throw Exception("Function " + getName() + " needs one argument with data",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto * data_type_function = checkAndGetDataType<DataTypeFunction>(arguments[0].type.get());
if (!data_type_function)
throw Exception("First argument for function " + getName() + " must be a function.",
throw Exception("First argument for function " + getName() + " must be a function",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
/// The types of the remaining arguments are already checked in getLambdaArgumentTypes.
@ -131,9 +184,28 @@ public:
throw Exception("Expression for function " + getName() + " must return UInt8, found "
+ return_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const auto * first_array_type = checkAndGetDataType<DataTypeArray>(arguments[1].type.get());
static_assert(
std::is_same_v<typename Impl::data_type, DataTypeMap> ||
std::is_same_v<typename Impl::data_type, DataTypeArray>,
"unsupported type");
return Impl::getReturnType(return_type, first_array_type->getNestedType());
if (arguments.size() < 2)
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "{}", arguments.size());
}
const auto * first_array_type = checkAndGetDataType<typename Impl::data_type>(arguments[1].type.get());
if (!first_array_type)
throw DB::Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Unsupported type {}", arguments[1].type->getName());
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeArray>)
return Impl::getReturnType(return_type, first_array_type->getNestedType());
if constexpr (std::is_same_v<typename Impl::data_type, DataTypeMap>)
return Impl::getReturnType(return_type, first_array_type->getKeyValueTypes());
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
}
}
@ -142,18 +214,25 @@ public:
if (arguments.size() == 1)
{
ColumnPtr column_array_ptr = arguments[0].column;
const auto * column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
const auto * column_array = checkAndGetColumn<typename Impl::column_type>(column_array_ptr.get());
if (!column_array)
{
const ColumnConst * column_const_array = checkAndGetColumnConst<ColumnArray>(column_array_ptr.get());
const ColumnConst * column_const_array = checkAndGetColumnConst<typename Impl::column_type>(column_array_ptr.get());
if (!column_const_array)
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN);
column_array_ptr = column_const_array->convertToFullColumn();
column_array = assert_cast<const ColumnArray *>(column_array_ptr.get());
column_array = assert_cast<const typename Impl::column_type *>(column_array_ptr.get());
}
return Impl::execute(*column_array, column_array->getDataPtr());
if constexpr (std::is_same_v<typename Impl::column_type, ColumnMap>)
{
return Impl::execute(*column_array, column_array->getNestedColumn().getDataPtr());
}
else
{
return Impl::execute(*column_array, column_array->getDataPtr());
}
}
else
{
@ -172,7 +251,7 @@ public:
ColumnPtr offsets_column;
ColumnPtr column_first_array_ptr;
const ColumnArray * column_first_array = nullptr;
const typename Impl::column_type * column_first_array = nullptr;
ColumnsWithTypeAndName arrays;
arrays.reserve(arguments.size() - 1);
@ -182,18 +261,18 @@ public:
const auto & array_with_type_and_name = arguments[i];
ColumnPtr column_array_ptr = array_with_type_and_name.column;
const auto * column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
const auto * column_array = checkAndGetColumn<typename Impl::column_type>(column_array_ptr.get());
const DataTypePtr & array_type_ptr = array_with_type_and_name.type;
const auto * array_type = checkAndGetDataType<DataTypeArray>(array_type_ptr.get());
const auto * array_type = checkAndGetDataType<typename Impl::data_type>(array_type_ptr.get());
if (!column_array)
{
const ColumnConst * column_const_array = checkAndGetColumnConst<ColumnArray>(column_array_ptr.get());
const ColumnConst * column_const_array = checkAndGetColumnConst<typename Impl::column_type>(column_array_ptr.get());
if (!column_const_array)
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN);
column_array_ptr = recursiveRemoveLowCardinality(column_const_array->convertToFullColumn());
column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
column_array = checkAndGetColumn<typename Impl::column_type>(column_array_ptr.get());
}
if (!array_type)
@ -201,13 +280,13 @@ public:
if (!offsets_column)
{
offsets_column = column_array->getOffsetsPtr();
offsets_column = getOffsetsPtr(*column_array);
}
else
{
/// The first condition is optimization: do not compare data if the pointers are equal.
if (column_array->getOffsetsPtr() != offsets_column
&& column_array->getOffsets() != typeid_cast<const ColumnArray::ColumnOffsets &>(*offsets_column).getData())
if (getOffsetsPtr(*column_array) != offsets_column
&& getOffsets(*column_array) != typeid_cast<const ColumnArray::ColumnOffsets &>(*offsets_column).getData())
throw Exception("Arrays passed to " + getName() + " must have equal size", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
@ -217,13 +296,23 @@ public:
column_first_array = column_array;
}
arrays.emplace_back(ColumnWithTypeAndName(column_array->getDataPtr(),
recursiveRemoveLowCardinality(array_type->getNestedType()),
array_with_type_and_name.name));
if constexpr (std::is_same_v<DataTypeMap, typename Impl::data_type>)
{
arrays.emplace_back(ColumnWithTypeAndName(
column_array->getNestedData().getColumnPtr(0), recursiveRemoveLowCardinality(array_type->getKeyType()), array_with_type_and_name.name+".key"));
arrays.emplace_back(ColumnWithTypeAndName(
column_array->getNestedData().getColumnPtr(1), recursiveRemoveLowCardinality(array_type->getValueType()), array_with_type_and_name.name+".value"));
}
else
{
arrays.emplace_back(ColumnWithTypeAndName(column_array->getDataPtr(),
recursiveRemoveLowCardinality(array_type->getNestedType()),
array_with_type_and_name.name));
}
}
/// Put all the necessary columns multiplied by the sizes of arrays into the columns.
auto replicated_column_function_ptr = IColumn::mutate(column_function->replicate(column_first_array->getOffsets()));
auto replicated_column_function_ptr = IColumn::mutate(column_function->replicate(getOffsets(*column_first_array)));
auto * replicated_column_function = typeid_cast<ColumnFunction *>(replicated_column_function_ptr.get());
replicated_column_function->appendArguments(arrays);

View File

@ -1,12 +1,18 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h>
#include <base/defines.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -83,6 +89,9 @@ using ArrayAggregateResult = typename ArrayAggregateResultImpl<ArrayElement, ope
template<AggregateOperation aggregate_operation>
struct ArrayAggregateImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return false; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }

View File

@ -1,8 +1,8 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include "FunctionArrayMapped.h"
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -16,6 +16,9 @@ namespace ErrorCodes
*/
struct ArrayAllImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return true; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }

View File

@ -1,10 +1,13 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <Common/HashTable/HashTable.h>
#include <Functions/array/FunctionArrayMapped.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/array/FunctionArrayMapped.h>
namespace DB
@ -16,13 +19,16 @@ namespace ErrorCodes
struct ArrayCompactImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return false; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }
static DataTypePtr getReturnType(const DataTypePtr & nested_type, const DataTypePtr &)
static DataTypePtr getReturnType(const DataTypePtr & , const DataTypePtr & array_element)
{
return std::make_shared<DataTypeArray>(nested_type);
return std::make_shared<DataTypeArray>(array_element);
}
template <typename T>
@ -30,14 +36,16 @@ struct ArrayCompactImpl
{
using ColVecType = ColumnVectorOrDecimal<T>;
const ColVecType * src_values_column = checkAndGetColumn<ColVecType>(mapped.get());
const ColVecType * check_values_column = checkAndGetColumn<ColVecType>(mapped.get());
const ColVecType * src_values_column = checkAndGetColumn<ColVecType>(array.getData());
if (!src_values_column)
if (!src_values_column || !check_values_column)
return false;
const IColumn::Offsets & src_offsets = array.getOffsets();
const typename ColVecType::Container & src_values = src_values_column->getData();
const auto & src_values = src_values_column->getData();
const auto & check_values = check_values_column->getData();
typename ColVecType::MutablePtr res_values_column;
if constexpr (is_decimal<T>)
res_values_column = ColVecType::create(src_values.size(), src_values_column->getScale());
@ -45,6 +53,7 @@ struct ArrayCompactImpl
res_values_column = ColVecType::create(src_values.size());
typename ColVecType::Container & res_values = res_values_column->getData();
size_t src_offsets_size = src_offsets.size();
auto res_offsets_column = ColumnArray::ColumnOffsets::create(src_offsets_size);
IColumn::Offsets & res_offsets = res_offsets_column->getData();
@ -67,7 +76,7 @@ struct ArrayCompactImpl
++res_pos;
for (; src_pos < src_offset; ++src_pos)
{
if (!bitEquals(src_values[src_pos], src_values[src_pos - 1]))
if (!bitEquals(check_values[src_pos], check_values[src_pos - 1]))
{
res_values[res_pos] = src_values[src_pos];
++res_pos;
@ -86,8 +95,9 @@ struct ArrayCompactImpl
{
const IColumn::Offsets & src_offsets = array.getOffsets();
auto res_values_column = mapped->cloneEmpty();
res_values_column->reserve(mapped->size());
const auto & src_values = array.getData();
auto res_values_column = src_values.cloneEmpty();
res_values_column->reserve(src_values.size());
size_t src_offsets_size = src_offsets.size();
auto res_offsets_column = ColumnArray::ColumnOffsets::create(src_offsets_size);
@ -104,7 +114,7 @@ struct ArrayCompactImpl
if (src_pos < src_offset)
{
/// Insert first element unconditionally.
res_values_column->insertFrom(*mapped, src_pos);
res_values_column->insertFrom(src_values, src_pos);
/// For the rest of elements, insert if the element is different from the previous.
++src_pos;
@ -113,7 +123,7 @@ struct ArrayCompactImpl
{
if (mapped->compareAt(src_pos - 1, src_pos, *mapped, 1))
{
res_values_column->insertFrom(*mapped, src_pos);
res_values_column->insertFrom(src_values, src_pos);
++res_pos;
}
}

View File

@ -1,8 +1,9 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include "FunctionArrayMapped.h"
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -16,6 +17,9 @@ namespace ErrorCodes
*/
struct ArrayCountImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return true; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }

View File

@ -1,10 +1,11 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include "FunctionArrayMapped.h"
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -17,6 +18,9 @@ namespace ErrorCodes
struct ArrayCumSumImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return false; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }

View File

@ -1,10 +1,10 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include "FunctionArrayMapped.h"
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -19,6 +19,9 @@ namespace ErrorCodes
*/
struct ArrayCumSumNonNegativeImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return false; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }

View File

@ -1,10 +1,11 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include "FunctionArrayMapped.h"
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -20,6 +21,9 @@ namespace ErrorCodes
*/
struct ArrayDifferenceImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return false; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }

View File

@ -1,8 +1,9 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include "FunctionArrayMapped.h"
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -16,6 +17,9 @@ namespace ErrorCodes
*/
struct ArrayExistsImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return true; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }

View File

@ -1,8 +1,9 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include "FunctionArrayMapped.h"
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -19,6 +20,9 @@ namespace ErrorCodes
template <bool reverse>
struct ArrayFillImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return true; }
static bool needExpression() { return true; }
static bool needOneArray() { return false; }

View File

@ -1,8 +1,9 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include "FunctionArrayMapped.h"
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -15,6 +16,9 @@ namespace ErrorCodes
*/
struct ArrayFilterImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return true; }
static bool needExpression() { return true; }
static bool needOneArray() { return false; }

View File

@ -1,8 +1,9 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include "FunctionArrayMapped.h"
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -20,6 +21,9 @@ enum class ArrayFirstLastStrategy
template <ArrayFirstLastStrategy strategy>
struct ArrayFirstLastImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return false; }
static bool needExpression() { return true; }
static bool needOneArray() { return false; }

View File

@ -1,8 +1,9 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -20,6 +21,9 @@ enum class ArrayFirstLastIndexStrategy
template <ArrayFirstLastIndexStrategy strategy>
struct ArrayFirstLastIndexImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return false; }
static bool needExpression() { return true; }
static bool needOneArray() { return false; }

View File

@ -1,14 +1,18 @@
#include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
/** arrayMap(x1,...,xn -> expression, array1,...,arrayn) - apply the expression to each element of the array (or set of parallel arrays).
/** arrayMap(x1, ..., xn -> expression, array1, ..., arrayn) - apply the expression to each element of the array (or set of parallel arrays).
*/
struct ArrayMapImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
/// true if the expression (for an overload of f(expression, arrays)) or an array (for f(array)) should be boolean.
static bool needBoolean() { return false; }
/// true if the f(array) overload is unavailable.

View File

@ -1,8 +1,8 @@
#include "FunctionArrayMapped.h"
#include <base/sort.h>
#include <Functions/FunctionFactory.h>
namespace DB
{
@ -11,6 +11,9 @@ namespace DB
template <bool positive>
struct ArraySortImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return false; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }

View File

@ -1,8 +1,9 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include "FunctionArrayMapped.h"
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "FunctionArrayMapped.h"
namespace DB
{
@ -14,6 +15,9 @@ namespace ErrorCodes
template <bool reverse>
struct ArraySplitImpl
{
using column_type = ColumnArray;
using data_type = DataTypeArray;
static bool needBoolean() { return true; }
static bool needExpression() { return true; }
static bool needOneArray() { return false; }

View File

@ -518,6 +518,115 @@ public:
}
};
class FunctionMapUpdate : public IFunction
{
public:
static constexpr auto name = "mapUpdate";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionMapUpdate>(); }
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 2; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 2)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 2",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypeMap * left = checkAndGetDataType<DataTypeMap>(arguments[0].type.get());
const DataTypeMap * right = checkAndGetDataType<DataTypeMap>(arguments[1].type.get());
if (!left || !right)
throw Exception{"The two arguments for function " + getName() + " must be both Map type",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
if (!left->getKeyType()->equals(*right->getKeyType()) || !left->getValueType()->equals(*right->getValueType()))
throw Exception{"The Key And Value type of Map for function " + getName() + " must be the same",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
return std::make_shared<DataTypeMap>(left->getKeyType(), left->getValueType());
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const ColumnMap * col_map_left = typeid_cast<const ColumnMap *>(arguments[0].column.get());
const auto * col_const_map_left = checkAndGetColumnConst<ColumnMap>(arguments[0].column.get());
if (col_const_map_left)
col_map_left = typeid_cast<const ColumnMap *>(&col_const_map_left->getDataColumn());
if (!col_map_left)
return nullptr;
const ColumnMap * col_map_right = typeid_cast<const ColumnMap *>(arguments[1].column.get());
const auto * col_const_map_right = checkAndGetColumnConst<ColumnMap>(arguments[1].column.get());
if (col_const_map_right)
col_map_right = typeid_cast<const ColumnMap *>(&col_const_map_right->getDataColumn());
if (!col_map_right)
return nullptr;
const auto & nested_column_left = col_map_left->getNestedColumn();
const auto & keys_data_left = col_map_left->getNestedData().getColumn(0);
const auto & values_data_left = col_map_left->getNestedData().getColumn(1);
const auto & offsets_left = nested_column_left.getOffsets();
const auto & nested_column_right = col_map_right->getNestedColumn();
const auto & keys_data_right = col_map_right->getNestedData().getColumn(0);
const auto & values_data_right = col_map_right->getNestedData().getColumn(1);
const auto & offsets_right = nested_column_right.getOffsets();
const auto & result_type_map = static_cast<const DataTypeMap &>(*result_type);
const DataTypePtr & key_type = result_type_map.getKeyType();
const DataTypePtr & value_type = result_type_map.getValueType();
MutableColumnPtr keys_data = key_type->createColumn();
MutableColumnPtr values_data = value_type->createColumn();
MutableColumnPtr offsets = DataTypeNumber<IColumn::Offset>().createColumn();
IColumn::Offset current_offset = 0;
for (size_t idx = 0; idx < input_rows_count; ++idx)
{
for (size_t i = offsets_left[idx - 1]; i < offsets_left[idx]; ++i)
{
bool matched = false;
auto key = keys_data_left.getDataAt(i);
for (size_t j = offsets_right[idx - 1]; j < offsets_right[idx]; ++j)
{
if (keys_data_right.getDataAt(j).toString() == key.toString())
{
matched = true;
break;
}
}
if (!matched)
{
keys_data->insertFrom(keys_data_left, i);
values_data->insertFrom(values_data_left, i);
++current_offset;
}
}
for (size_t j = offsets_right[idx - 1]; j < offsets_right[idx]; ++j)
{
keys_data->insertFrom(keys_data_right, j);
values_data->insertFrom(values_data_right, j);
++current_offset;
}
offsets->insert(current_offset);
}
auto nested_column = ColumnArray::create(
ColumnTuple::create(Columns{std::move(keys_data), std::move(values_data)}),
std::move(offsets));
return ColumnMap::create(nested_column);
}
};
}
void registerFunctionsMap(FunctionFactory & factory)
@ -528,6 +637,7 @@ void registerFunctionsMap(FunctionFactory & factory)
factory.registerFunction<FunctionMapValues>();
factory.registerFunction<FunctionMapContainsKeyLike>();
factory.registerFunction<FunctionExtractKeyLike>();
factory.registerFunction<FunctionMapUpdate>();
}
}

144
src/Functions/mapFilter.cpp Normal file
View File

@ -0,0 +1,144 @@
#include <Columns/ColumnMap.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/array/FunctionArrayMapped.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
/** Higher-order functions for map.
* These functions optionally apply a map by lambda function,
* and return some result based on that transformation.
*/
/** mapFilter((k, v) -> predicate, map) - leave in the map only the kv elements for which the expression is true.
*/
struct MapFilterImpl
{
using data_type = DataTypeMap;
using column_type = ColumnMap;
static constexpr auto name = "mapFilter";
static bool needBoolean() { return true; }
static bool needExpression() { return true; }
static bool needOneArray() { return true; }
static DataTypePtr getReturnType(const DataTypePtr & /*expression_return*/, const DataTypes & elems)
{
return std::make_shared<DataTypeMap>(elems);
}
/// If there are several arrays, the first one is passed here.
static ColumnPtr execute(const ColumnMap & map_column, ColumnPtr mapped)
{
const ColumnUInt8 * column_filter = typeid_cast<const ColumnUInt8 *>(&*mapped);
if (!column_filter)
{
const auto * column_filter_const = checkAndGetColumnConst<ColumnUInt8>(&*mapped);
if (!column_filter_const)
throw Exception("Unexpected type of filter column", ErrorCodes::ILLEGAL_COLUMN);
if (column_filter_const->getValue<UInt8>())
return map_column.clone();
else
{
const auto * column_array = typeid_cast<const ColumnArray *>(map_column.getNestedColumnPtr().get());
const auto * column_tuple = typeid_cast<const ColumnTuple *>(column_array->getDataPtr().get());
ColumnPtr keys = column_tuple->getColumnPtr(0)->cloneEmpty();
ColumnPtr values = column_tuple->getColumnPtr(1)->cloneEmpty();
return ColumnMap::create(keys, values, ColumnArray::ColumnOffsets::create(map_column.size(), 0));
}
}
const IColumn::Filter & filter = column_filter->getData();
ColumnPtr filtered = map_column.getNestedColumn().getData().filter(filter, -1);
const IColumn::Offsets & in_offsets = map_column.getNestedColumn().getOffsets();
auto column_offsets = ColumnArray::ColumnOffsets::create(in_offsets.size());
IColumn::Offsets & out_offsets = column_offsets->getData();
size_t in_pos = 0;
size_t out_pos = 0;
for (size_t i = 0; i < in_offsets.size(); ++i)
{
for (; in_pos < in_offsets[i]; ++in_pos)
{
if (filter[in_pos])
++out_pos;
}
out_offsets[i] = out_pos;
}
return ColumnMap::create(ColumnArray::create(filtered, std::move(column_offsets)));
}
};
/** mapApply((k,v) -> expression, map) - apply the expression to the map.
*/
struct MapApplyImpl
{
using data_type = DataTypeMap;
using column_type = ColumnMap;
static constexpr auto name = "mapApply";
/// true if the expression (for an overload of f(expression, maps)) or a map (for f(map)) should be boolean.
static bool needBoolean() { return false; }
static bool needExpression() { return true; }
static bool needOneArray() { return true; }
static DataTypePtr getReturnType(const DataTypePtr & expression_return, const DataTypes & /*elems*/)
{
const auto * tuple_types = typeid_cast<const DataTypeTuple *>(expression_return.get());
if (!tuple_types)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Expected return type is tuple, got {}", expression_return->getName());
if (tuple_types->getElements().size() != 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Expected 2 columns as map's key and value, but found {}", tuple_types->getElements().size());
return std::make_shared<DataTypeMap>(tuple_types->getElements());
}
static ColumnPtr execute(const ColumnMap & map, ColumnPtr mapped)
{
const auto * column_tuple = checkAndGetColumn<ColumnTuple>(mapped.get());
if (!column_tuple)
{
const ColumnConst * column_const_tuple = checkAndGetColumnConst<ColumnTuple>(mapped.get());
if (!column_const_tuple)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Expected tuple column, found {}", mapped->getName());
auto cols = convertConstTupleToConstantElements(*column_const_tuple);
return ColumnMap::create(cols[0]->convertToFullColumnIfConst(), cols[1]->convertToFullColumnIfConst(), map.getNestedColumn().getOffsetsPtr());
}
return ColumnMap::create(column_tuple->getColumnPtr(0), column_tuple->getColumnPtr(1),
map.getNestedColumn().getOffsetsPtr());
}
};
void registerFunctionMapApply(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayMapped<MapFilterImpl, MapFilterImpl>>();
factory.registerFunction<FunctionArrayMapped<MapApplyImpl, MapApplyImpl>>();
}
}

View File

@ -18,6 +18,7 @@ void registerFunctionsArraySort(FunctionFactory & factory);
void registerFunctionArrayCumSum(FunctionFactory & factory);
void registerFunctionArrayCumSumNonNegative(FunctionFactory & factory);
void registerFunctionArrayDifference(FunctionFactory & factory);
void registerFunctionMapApply(FunctionFactory & factory);
void registerFunctionsHigherOrder(FunctionFactory & factory)
{
@ -36,6 +37,7 @@ void registerFunctionsHigherOrder(FunctionFactory & factory)
registerFunctionArrayCumSum(factory);
registerFunctionArrayCumSumNonNegative(factory);
registerFunctionArrayDifference(factory);
registerFunctionMapApply(factory);
}
}

View File

@ -116,7 +116,14 @@ void WriteBufferFromS3::allocateBuffer()
WriteBufferFromS3::~WriteBufferFromS3()
{
finalize();
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void WriteBufferFromS3::preFinalize()

View File

@ -138,10 +138,10 @@ private:
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);
public:
Queue getQueue() const
auto getQueueLocked() const
{
std::shared_lock lock(rwlock);
return queue;
return std::make_pair(std::ref(queue), std::move(lock));
}
};

View File

@ -623,8 +623,6 @@ BlockIO InterpreterSelectQuery::execute()
Block InterpreterSelectQuery::getSampleBlockImpl()
{
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
query_info.query = query_ptr;
query_info.has_window = query_analyzer->hasWindow();
if (storage && !options.only_analyze)

View File

@ -150,6 +150,42 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
}
}
void OpenTelemetrySpanHolder::addAttribute(const std::string& name, UInt64 value)
{
if (trace_id == UUID())
return;
this->attribute_names.push_back(name);
this->attribute_values.push_back(std::to_string(value));
}
void OpenTelemetrySpanHolder::addAttribute(const std::string& name, const std::string& value)
{
if (trace_id == UUID())
return;
this->attribute_names.push_back(name);
this->attribute_values.push_back(value);
}
void OpenTelemetrySpanHolder::addAttribute(const Exception & e)
{
if (trace_id == UUID())
return;
this->attribute_names.push_back("clickhouse.exception");
this->attribute_values.push_back(getExceptionMessage(e, false));
}
void OpenTelemetrySpanHolder::addAttribute(std::exception_ptr e)
{
if (trace_id == UUID() || e == nullptr)
return;
this->attribute_names.push_back("clickhouse.exception");
this->attribute_values.push_back(getExceptionMessage(e, false));
}
bool OpenTelemetryTraceContext::parseTraceparentHeader(const std::string & traceparent,
std::string & error)
{

View File

@ -45,6 +45,11 @@ public:
struct OpenTelemetrySpanHolder : public OpenTelemetrySpan
{
OpenTelemetrySpanHolder(const std::string & _operation_name);
void addAttribute(const std::string& name, UInt64 value);
void addAttribute(const std::string& name, const std::string& value);
void addAttribute(const Exception & e);
void addAttribute(std::exception_ptr e);
~OpenTelemetrySpanHolder();
};

View File

@ -47,6 +47,7 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
bool useDefaultImplementationForNulls() const override { return true; }
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes &) const override
{

View File

@ -156,7 +156,7 @@ namespace
}
void eraseNonGrantable(AccessRightsElements & elements)
void throwIfNotGrantable(AccessRightsElements & elements)
{
boost::range::remove_erase_if(elements, [](AccessRightsElement & element)
{
@ -303,7 +303,12 @@ bool ParserGrantQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
if (!is_revoke)
eraseNonGrantable(elements);
{
if (attach_mode)
elements.eraseNonGrantable();
else
throwIfNotGrantable(elements);
}
auto query = std::make_shared<ASTGrantQuery>();
node = query;

View File

@ -64,7 +64,7 @@ IProcessor::Status DelayedSource::prepare()
continue;
}
if (!output->isNeeded())
if (!output->canPush())
return Status::PortFull;
if (input->isFinished())

View File

@ -32,8 +32,19 @@ RemoteInserter::RemoteInserter(
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (CurrentThread::isInitialized())
{
modified_client_info.client_trace_context
= CurrentThread::get().thread_trace_context;
auto& thread_trace_context = CurrentThread::get().thread_trace_context;
if (thread_trace_context.trace_id != UUID())
{
// overwrite the trace context only if current thread trace context is available
modified_client_info.client_trace_context = thread_trace_context;
}
else
{
// if the trace on the thread local is not enabled(for example running in a background thread)
// we should not clear the trace context on the client info because the client info may hold trace context
// and this trace context should be propagated to the remote server so that the tracing of distributed table insert is complete.
}
}
/** Send query and receive "header", that describes table structure.

View File

@ -20,6 +20,7 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Common/setThreadName.h>
@ -331,9 +332,14 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
const Settings & settings = context->getSettingsRef();
/// Do not initiate INSERT for empty block.
if (shard_block.rows() == 0)
size_t rows = shard_block.rows();
if (rows == 0)
return;
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", rows);
if (!job.is_local_job || !settings.prefer_localhost_replica)
{
if (!job.executor)
@ -406,13 +412,15 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
}
job.blocks_written += 1;
job.rows_written += shard_block.rows();
job.rows_written += rows;
};
}
void DistributedSink::writeSync(const Block & block)
{
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
const Settings & settings = context->getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
Block block_to_send = removeSuperfluousColumns(block);
@ -456,6 +464,10 @@ void DistributedSink::writeSync(const Block & block)
size_t num_shards = end - start;
span.addAttribute("clickhouse.start_shard", start);
span.addAttribute("clickhouse.end_shard", end);
span.addAttribute("db.statement", this->query_string);
if (num_shards > 1)
{
auto current_selector = createSelector(block);
@ -489,6 +501,7 @@ void DistributedSink::writeSync(const Block & block)
catch (Exception & exception)
{
exception.addMessage(getCurrentStateDescription());
span.addAttribute(exception);
throw;
}
@ -597,10 +610,15 @@ void DistributedSink::writeSplitAsync(const Block & block)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
OpenTelemetrySpanHolder span("DistributedBlockOutputStream::writeAsyncImpl()");
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();
Block block_to_send = removeSuperfluousColumns(block);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", block.rows());
if (shard_info.hasInternalReplication())
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
@ -634,6 +652,9 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
void DistributedSink::writeToLocal(const Block & block, size_t repeats)
{
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("db.statement", this->query_string);
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
auto block_io = interp.execute();
@ -647,6 +668,8 @@ void DistributedSink::writeToLocal(const Block & block, size_t repeats)
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();
@ -713,7 +736,19 @@ void DistributedSink::writeToShard(const Block & block, const std::vector<std::s
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, header_buf);
writeStringBinary(query_string, header_buf);
context->getSettingsRef().write(header_buf);
context->getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
if (context->getClientInfo().client_trace_context.trace_id != UUID() && CurrentThread::isInitialized())
{
// if the distributed tracing is enabled, use the trace context in current thread as parent of next span
auto client_info = context->getClientInfo();
client_info.client_trace_context = CurrentThread::get().thread_trace_context;
client_info.write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
}
else
{
context->getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
}
writeVarUInt(block.rows(), header_buf);
writeVarUInt(block.bytes(), header_buf);
writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf); /// obsolete

View File

@ -818,6 +818,9 @@ void registerStorageFileLog(StorageFactory & factory)
bool StorageFileLog::updateFileInfos()
{
if (file_infos.file_names.empty())
return false;
if (!directory_watch)
{
/// For table just watch one file, we can not use directory monitor to watch it

View File

@ -47,7 +47,7 @@ public:
/// Is is useful because column oriented formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool isColumnOriented() const;
bool isColumnOriented() const override;
static ColumnsDescription getTableStructureFromData(
const String & format,

View File

@ -562,6 +562,8 @@ public:
/// Returns true if all disks of storage are read-only.
virtual bool isStaticStorage() const;
virtual bool isColumnOriented() const { return false; }
/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
/// Used for:
/// - Simple count() optimization

View File

@ -710,9 +710,14 @@ void bloomFilterIndexValidator(const IndexDescription & index, bool /*attach*/)
const auto & array_type = assert_cast<const DataTypeArray &>(*index_data_type);
data_type = WhichDataType(array_type.getNestedType());
}
else if (data_type.isLowCarnality())
{
const auto & low_cardinality = assert_cast<const DataTypeLowCardinality &>(*index_data_type);
data_type = WhichDataType(low_cardinality.getDictionaryType());
}
if (!data_type.isString() && !data_type.isFixedString())
throw Exception("Bloom filter index can be used only with `String`, `FixedString` column or Array with `String` or `FixedString` values column.", ErrorCodes::INCORRECT_QUERY);
throw Exception("Bloom filter index can be used only with `String`, `FixedString`, `LowCardinality(String)`, `LowCardinality(FixedString)` column or Array with `String` or `FixedString` values column.", ErrorCodes::INCORRECT_QUERY);
}
if (index.type == NgramTokenExtractor::getName())

View File

@ -64,7 +64,7 @@ public:
/// Is is useful because column oriented formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool isColumnOriented() const;
bool isColumnOriented() const override;
bool supportsPartitionBy() const override { return true; }

View File

@ -615,6 +615,11 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
}
}
bool StorageS3::isColumnOriented() const
{
return FormatFactory::instance().checkIfFormatIsColumnOriented(format_name);
}
Pipe StorageS3::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -639,6 +644,20 @@ Pipe StorageS3::read(
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, local_context);
ColumnsDescription columns_description;
Block block_for_format;
if (isColumnOriented())
{
columns_description = ColumnsDescription{
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()).getNamesAndTypesList()};
block_for_format = metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = metadata_snapshot->getColumns();
block_for_format = metadata_snapshot->getSampleBlock();
}
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageS3Source>(
@ -646,10 +665,10 @@ Pipe StorageS3::read(
need_file_column,
format_name,
getName(),
metadata_snapshot->getSampleBlock(),
block_for_format,
local_context,
format_settings,
metadata_snapshot->getColumns(),
columns_description,
max_block_size,
max_single_read_retries,
compression_method,

View File

@ -218,6 +218,8 @@ private:
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
bool isColumnOriented() const override;
};
}

View File

@ -405,7 +405,7 @@ std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIPara
std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ColumnsDescription & /* columns_description */,
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
@ -482,6 +482,11 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from urls failed. Errors:\n{}", exception_messages);
}
bool IStorageURLBase::isColumnOriented() const
{
return FormatFactory::instance().checkIfFormatIsColumnOriented(format_name);
}
Pipe IStorageURLBase::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -493,6 +498,20 @@ Pipe IStorageURLBase::read(
{
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
ColumnsDescription columns_description;
Block block_for_format;
if (isColumnOriented())
{
columns_description = ColumnsDescription{
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()).getNamesAndTypesList()};
block_for_format = metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = metadata_snapshot->getColumns();
block_for_format = metadata_snapshot->getSampleBlock();
}
if (urlWithGlobs(uri))
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
@ -515,14 +534,14 @@ Pipe IStorageURLBase::read(
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
block_for_format,
local_context,
metadata_snapshot->getColumns(),
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params, /* glob_url */true));
@ -537,14 +556,14 @@ Pipe IStorageURLBase::read(
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
block_for_format,
local_context,
metadata_snapshot->getColumns(),
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
@ -561,6 +580,20 @@ Pipe StorageURLWithFailover::read(
size_t max_block_size,
unsigned /*num_streams*/)
{
ColumnsDescription columns_description;
Block block_for_format;
if (isColumnOriented())
{
columns_description = ColumnsDescription{
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()).getNamesAndTypesList()};
block_for_format = metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = metadata_snapshot->getColumns();
block_for_format = metadata_snapshot->getSampleBlock();
}
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
@ -569,14 +602,14 @@ Pipe StorageURLWithFailover::read(
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
block_for_format,
local_context,
metadata_snapshot->getColumns(),
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));

View File

@ -88,12 +88,14 @@ protected:
virtual std::function<void(std::ostream &)> getReadPOSTDataCallback(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ColumnsDescription & columns_description,
const SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;
bool isColumnOriented() const override;
private:
virtual Block getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const = 0;
};

View File

@ -68,14 +68,14 @@ std::vector<std::pair<std::string, std::string>> StorageXDBC::getReadURIParams(
std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const ColumnsDescription & columns_description,
const SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
String query = transformQueryForExternalDatabase(query_info,
metadata_snapshot->getColumns().getOrdinary(),
columns_description.getOrdinary(),
bridge_helper->getIdentifierQuotingStyle(),
remote_database_name,
remote_table_name,
@ -85,7 +85,7 @@ std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
NamesAndTypesList cols;
for (const String & name : column_names)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(name);
auto column_data = columns_description.getPhysical(name);
cols.emplace_back(column_data.name, column_data.type);
}
@ -114,7 +114,7 @@ Pipe StorageXDBC::read(
return IStorageURLBase::read(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
}
SinkToStoragePtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
bridge_helper->startBridgeSync();
@ -140,6 +140,11 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetad
chooseCompressionMethod(uri, compression_method));
}
bool StorageXDBC::isColumnOriented() const
{
return true;
}
Block StorageXDBC::getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const
{
return metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());

View File

@ -59,13 +59,15 @@ private:
std::function<void(std::ostream &)> getReadPOSTDataCallback(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const ColumnsDescription & columns_description,
const SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
Block getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const override;
bool isColumnOriented() const override;
};
}

View File

@ -39,7 +39,13 @@ if(Git_FOUND)
ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE)
endif()
configure_file (StorageSystemBuildOptions.generated.cpp.in ${CONFIG_BUILD})
function(generate_system_build_options)
include(${ClickHouse_SOURCE_DIR}/src/configure_config.cmake)
include(${ClickHouse_SOURCE_DIR}/src/Functions/configure_config.cmake)
include(${ClickHouse_SOURCE_DIR}/src/Formats/configure_config.cmake)
configure_file(StorageSystemBuildOptions.generated.cpp.in ${CONFIG_BUILD})
endfunction()
generate_system_build_options()
include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_headers_and_sources(storages_system .)

View File

@ -41,10 +41,10 @@ void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, Co
if (!insert_queue)
return;
auto queue = insert_queue->getQueue();
auto [queue, queue_lock] = insert_queue->getQueueLocked();
for (const auto & [key, elem] : queue)
{
std::lock_guard lock(elem->mutex);
std::lock_guard elem_lock(elem->mutex);
if (!elem->data)
continue;

View File

@ -11,7 +11,6 @@ const char * auto_config_build[]
"VERSION_DATE", "@VERSION_DATE@",
"BUILD_TYPE", "@CMAKE_BUILD_TYPE@",
"SYSTEM_PROCESSOR", "@CMAKE_SYSTEM_PROCESSOR@",
"LIBRARY_ARCHITECTURE", "@CMAKE_LIBRARY_ARCHITECTURE@",
"CMAKE_VERSION", "@CMAKE_VERSION@",
"C_COMPILER", "@CMAKE_C_COMPILER@",
"C_COMPILER_VERSION", "@CMAKE_C_COMPILER_VERSION@",
@ -19,7 +18,7 @@ const char * auto_config_build[]
"CXX_COMPILER_VERSION", "@CMAKE_CXX_COMPILER_VERSION@",
"C_FLAGS", "@FULL_C_FLAGS_NORMALIZED@",
"CXX_FLAGS", "@FULL_CXX_FLAGS_NORMALIZED@",
"LINK_FLAGS", "@CMAKE_EXE_LINKER_FLAGS_NORMALIZED@",
"LINK_FLAGS", "@FULL_EXE_LINKER_FLAGS_NORMALIZED@",
"BUILD_COMPILE_DEFINITIONS", "@BUILD_COMPILE_DEFINITIONS@",
"STATIC", "@USE_STATIC_LIBRARIES@",
"SPLIT_BINARY", "@CLICKHOUSE_SPLIT_BINARY@",

View File

@ -337,6 +337,26 @@ class FailureReason(enum.Enum):
INTERNAL_ERROR = "Test internal error: "
class SettingsRandomizer:
settings = {
"max_insert_threads": lambda: 0 if random.random() < 0.5 else random.randint(1, 16),
"group_by_two_level_threshold": lambda: 1 if random.random() < 0.1 else 2 ** 60 if random.random() < 0.11 else 100000,
"group_by_two_level_threshold_bytes": lambda: 1 if random.random() < 0.1 else 2 ** 60 if random.random() < 0.11 else 50000000,
"distributed_aggregation_memory_efficient": lambda: random.randint(0, 1),
"fsync_metadata": lambda: random.randint(0, 1),
"priority": lambda: int(abs(random.gauss(0, 2))),
"output_format_parallel_formatting": lambda: random.randint(0, 1),
"input_format_parallel_parsing": lambda: random.randint(0, 1),
}
@staticmethod
def get_random_settings():
random_settings = []
for setting, generator in SettingsRandomizer.settings.items():
random_settings.append(setting + "=" + str(generator()) + "")
return random_settings
class TestResult:
def __init__(self, case_name: str, status: TestStatus, reason: Optional[FailureReason], total_time: float, description: str):
self.case_name: str = case_name
@ -417,6 +437,29 @@ class TestCase:
return testcase_args
def add_random_settings(self, client_options):
if self.tags and 'no-random-settings' in self.tags:
return client_options
if len(self.base_url_params) == 0:
os.environ['CLICKHOUSE_URL_PARAMS'] = '&'.join(self.random_settings)
else:
os.environ['CLICKHOUSE_URL_PARAMS'] = self.base_url_params + '&' + '&'.join(self.random_settings)
new_options = "--allow_repeated_settings --" + " --".join(self.random_settings)
os.environ['CLICKHOUSE_CLIENT_OPT'] = self.base_client_options + new_options + ' '
return client_options + new_options
def remove_random_settings_from_env(self):
os.environ['CLICKHOUSE_URL_PARAMS'] = self.base_url_params
os.environ['CLICKHOUSE_CLIENT_OPT'] = self.base_client_options
def add_info_about_settings(self, description):
if self.tags and 'no-random-settings' in self.tags:
return description
return description + "\n" + "Settings used in the test: " + "--" + " --".join(self.random_settings) + "\n"
def __init__(self, suite, case: str, args, is_concurrent: bool):
self.case: str = case # case file name
self.tags: Set[str] = suite.all_tags[case] if case in suite.all_tags else set()
@ -432,6 +475,10 @@ class TestCase:
self.testcase_args = None
self.runs_count = 0
self.random_settings = SettingsRandomizer.get_random_settings()
self.base_url_params = os.environ['CLICKHOUSE_URL_PARAMS'] if 'CLICKHOUSE_URL_PARAMS' in os.environ else ''
self.base_client_options = os.environ['CLICKHOUSE_CLIENT_OPT'] if 'CLICKHOUSE_CLIENT_OPT' in os.environ else ''
# should skip test, should increment skipped_total, skip reason
def should_skip_test(self, suite) -> Optional[FailureReason]:
tags = self.tags
@ -673,10 +720,13 @@ class TestCase:
self.runs_count += 1
self.testcase_args = self.configure_testcase_args(args, self.case_file, suite.suite_tmp_path)
client_options = self.add_random_settings(client_options)
proc, stdout, stderr, total_time = self.run_single_test(server_logs_level, client_options)
result = self.process_result_impl(proc, stdout, stderr, total_time)
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
if result.status == TestStatus.FAIL:
result.description = self.add_info_about_settings(result.description)
return result
except KeyboardInterrupt as e:
raise e
@ -684,17 +734,20 @@ class TestCase:
return TestResult(self.name, TestStatus.FAIL,
FailureReason.INTERNAL_QUERY_FAIL,
0.,
self.get_description_from_exception_info(sys.exc_info()))
self.add_info_about_settings(self.get_description_from_exception_info(sys.exc_info())))
except (ConnectionRefusedError, ConnectionResetError):
return TestResult(self.name, TestStatus.FAIL,
FailureReason.SERVER_DIED,
0.,
self.get_description_from_exception_info(sys.exc_info()))
self.add_info_about_settings(self.get_description_from_exception_info(sys.exc_info())))
except:
return TestResult(self.name, TestStatus.UNKNOWN,
FailureReason.INTERNAL_ERROR,
0.,
self.get_description_from_exception_info(sys.exc_info()))
finally:
self.remove_random_settings_from_env()
class TestSuite:
@staticmethod
@ -1078,11 +1131,15 @@ def collect_build_flags(args):
if value == 0:
result.append(BuildFlags.POLYMORPHIC_PARTS)
use_flags = clickhouse_execute(args, "SELECT name FROM system.build_options WHERE name like 'USE_%' AND value in ('ON', '1');")
use_flags = clickhouse_execute(args, "SELECT name FROM system.build_options WHERE name like 'USE_%' AND value in ('ON', '1')")
for use_flag in use_flags.strip().splitlines():
use_flag = use_flag.decode().lower()
result.append(use_flag)
system_processor = clickhouse_execute(args, "SELECT value FROM system.build_options WHERE name = 'SYSTEM_PROCESSOR' LIMIT 1").strip()
if system_processor:
result.append(f'cpu-{system_processor.decode().lower()}')
return result

View File

@ -1,8 +1,9 @@
import pytest
import uuid
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance')
instance = cluster.add_instance('instance', stay_alive=True)
@pytest.fixture(scope="module", autouse=True)
@ -14,7 +15,8 @@ def started_cluster():
finally:
cluster.shutdown()
def test_access_rights_for_funtion():
def test_access_rights_for_function():
create_function_query = "CREATE FUNCTION MySum AS (a, b) -> a + b"
instance.query("CREATE USER A")
@ -37,3 +39,19 @@ def test_access_rights_for_funtion():
instance.query("DROP USER IF EXISTS A")
instance.query("DROP USER IF EXISTS B")
def test_ignore_obsolete_grant_on_database():
instance.stop_clickhouse()
user_id = uuid.uuid4()
instance.exec_in_container(["bash", "-c" , f"""
cat > /var/lib/clickhouse/access/{user_id}.sql << EOF
ATTACH USER X;
ATTACH GRANT CREATE FUNCTION, SELECT ON mydb.* TO X;
EOF"""])
instance.exec_in_container(["bash", "-c" , "touch /var/lib/clickhouse/access/need_rebuild_lists.mark"])
instance.start_clickhouse()
assert instance.query("SHOW GRANTS FOR X") == "GRANT SELECT ON mydb.* TO X\n"

View File

@ -1183,3 +1183,39 @@ def materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, m
"\t2021\t3020399000000\t3020399000000\t00000000010100000000000000000000000000000000000000\t10\t1\t11\tvarbinary\tRED\n" +
"2\t2\t22\t9223372036854775807\t-2\t2\t22\t18446744073709551615\t-2.2\t2.2\t-2.22\t2.222\t2.2222\t2021-10-07\ttext\tvarchar\tBLOB\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786" +
"\t2021\t-3020399000000\t-46798000001\t000000000101000000D55C6E30D4095E40DCF0BBE996493E40\t11\t3\t22\tvarbinary\tGREEN\n")
def materialized_database_settings_materialized_mysql_tables_list(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database")
mysql_node.query("CREATE TABLE test_database.a (id INT(11) NOT NULL PRIMARY KEY, value VARCHAR(255))")
mysql_node.query("INSERT INTO test_database.a VALUES(1, 'foo')")
mysql_node.query("INSERT INTO test_database.a VALUES(2, 'bar')")
# table b(include json type, not in materialized_mysql_tables_list) can be skip
mysql_node.query("CREATE TABLE test_database.b (id INT(11) NOT NULL PRIMARY KEY, value JSON)")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializedMySQL('{}:3306', 'test_database', 'root', 'clickhouse') SETTINGS materialized_mysql_tables_list = ' a,c,d'".format(service_name))
check_query(clickhouse_node, "SELECT name from system.tables where database = 'test_database' FORMAT TSV", "a\n")
check_query(clickhouse_node, "SELECT COUNT() FROM test_database.a FORMAT TSV", "2\n")
# mysql data(binlog) can be skip
mysql_node.query("INSERT INTO test_database.b VALUES(1, '{\"name\":\"testjson\"}')")
mysql_node.query("INSERT INTO test_database.b VALUES(2, '{\"name\":\"testjson\"}')")
# irrelevant database can be skip
mysql_node.query("DROP DATABASE IF EXISTS other_database")
mysql_node.query("CREATE DATABASE other_database")
mysql_node.query("CREATE TABLE other_database.d (id INT(11) NOT NULL PRIMARY KEY, value json)")
mysql_node.query("INSERT INTO other_database.d VALUES(1, '{\"name\":\"testjson\"}')")
mysql_node.query("CREATE TABLE test_database.c (id INT(11) NOT NULL PRIMARY KEY, value VARCHAR(255))")
mysql_node.query("INSERT INTO test_database.c VALUES(1, 'foo')")
mysql_node.query("INSERT INTO test_database.c VALUES(2, 'bar')")
check_query(clickhouse_node, "SELECT name from system.tables where database = 'test_database' FORMAT TSV", "a\nc\n")
check_query(clickhouse_node, "SELECT COUNT() FROM test_database.c FORMAT TSV", "2\n")
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")

View File

@ -257,3 +257,7 @@ def test_table_overrides(started_cluster, started_mysql_8_0, started_mysql_5_7,
def test_materialized_database_support_all_kinds_of_mysql_datatype(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_5_7, "mysql57")
def test_materialized_database_settings_materialized_mysql_tables_list(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialized_database_settings_materialized_mysql_tables_list(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_database_settings_materialized_mysql_tables_list(clickhouse_node, started_mysql_5_7, "mysql57")

View File

@ -818,8 +818,9 @@ def test_seekable_formats(started_cluster):
instance.query("SYSTEM FLUSH LOGS")
result = instance.query(f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM s3') AND memory_usage > 0 ORDER BY event_time desc")
print(result[:3])
assert(int(result[:3]) < 200)
result = result[:result.index('.')]
assert(int(result) < 200)
def test_seekable_formats_url(started_cluster):
@ -842,8 +843,9 @@ def test_seekable_formats_url(started_cluster):
instance.query("SYSTEM FLUSH LOGS")
result = instance.query(f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM url') AND memory_usage > 0 ORDER BY event_time desc")
print(result[:3])
assert(int(result[:3]) < 200)
result = result[:result.index('.')]
assert(int(result) < 200)
def test_empty_file(started_cluster):
@ -886,7 +888,7 @@ def test_s3_schema_inference(started_cluster):
result = instance.query(f"select count(*) from schema_inference")
assert(int(result) == 5000000)
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_native', 'Native')"
result = instance.query(f"desc {table_function}")
assert result == "a\tInt32\t\t\t\t\t\nb\tString\t\t\t\t\t\n"
@ -949,7 +951,7 @@ def test_create_new_files_on_insert(started_cluster):
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
result = instance.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
@ -961,11 +963,11 @@ def test_create_new_files_on_insert(started_cluster):
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
result = instance.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
def test_format_detection(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1039,6 +1041,29 @@ def test_signatures(started_cluster):
assert(int(result) == 1)
def test_select_columns(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
name = "test_table2"
structure = "id UInt32, value1 Int32, value2 Int32"
instance.query(f"drop table if exists {name}")
instance.query(f"CREATE TABLE {name} ({structure}) ENGINE = S3(s3_conf1, format='Parquet')")
limit = 10000000
instance.query(f"INSERT INTO {name} SELECT * FROM generateRandom('{structure}') LIMIT {limit} SETTINGS s3_truncate_on_insert=1")
instance.query(f"SELECT value2 FROM {name}")
instance.query("SYSTEM FLUSH LOGS")
result1 = instance.query(f"SELECT read_bytes FROM system.query_log WHERE type='QueryFinish' and query LIKE 'SELECT value2 FROM {name}'")
instance.query(f"SELECT * FROM {name}")
instance.query("SYSTEM FLUSH LOGS")
result2 = instance.query(f"SELECT read_bytes FROM system.query_log WHERE type='QueryFinish' and query LIKE 'SELECT * FROM {name}'")
assert(int(result1) * 3 <= int(result2))
def test_insert_select_schema_inference(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1049,4 +1074,3 @@ def test_insert_select_schema_inference(started_cluster):
result = instance.query(f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.arrow')")
assert(int(result) == 1)

View File

@ -37,6 +37,24 @@
<database>db1</database>
</allow_databases>
</has_access>
<env_user_with_password>
<password>clickhouse</password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</env_user_with_password>
<env_user_not_with_password>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</env_user_not_with_password>
</users>
<quotas>

View File

@ -70,3 +70,17 @@ def test_user_zero_database_access(start_cluster):
["bash", "-c", "/usr/bin/clickhouse client --user 'default' --query 'DROP DATABASE test2'"], user='root')
except Exception as ex:
assert False, "user with full access rights can't drop database test2"
try:
name = node.exec_in_container(
["bash", "-c", "export CLICKHOUSE_USER=env_user_not_with_password && /usr/bin/clickhouse client --query 'SELECT currentUser()'"], user='root')
assert name.strip() == "env_user_not_with_password"
except Exception as ex:
assert False, "set env CLICKHOUSE_USER can not connect server"
try:
name = node.exec_in_container(
["bash", "-c", "export CLICKHOUSE_USER=env_user_with_password && export CLICKHOUSE_PASSWORD=clickhouse && /usr/bin/clickhouse client --query 'SELECT currentUser()'"], user='root')
assert name.strip() == "env_user_with_password"
except Exception as ex:
assert False, "set env CLICKHOUSE_USER CLICKHOUSE_PASSWORD can not connect server"

View File

@ -1,5 +1,6 @@
-- Tags: replica, distributed
SET allow_experimental_parallel_reading_from_replicas = 0;
SET max_parallel_replicas = 2;
DROP TABLE IF EXISTS report;

View File

@ -2,13 +2,11 @@
"value": 4611686018427387904
"name": "value",
"value": "4611686018427387904"
value
value
Cannot modify 'output_format_json_quote_64bit_integers' setting in readonly mode
OK
OK
"name": "value",
"value": "9223372036854775808"
"name": "value",
"value": 9223372036854775808
value
value
Cannot modify 'output_format_json_quote_64bit_integers' setting in readonly mode
OK
OK

View File

@ -9,13 +9,15 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
$CLICKHOUSE_CLIENT --query="select toUInt64(pow(2, 62)) as value format JSON" --output_format_json_quote_64bit_integers=0 | grep value
$CLICKHOUSE_CLIENT --query="select toUInt64(pow(2, 62)) as value format JSON" --output_format_json_quote_64bit_integers=1 | grep value
$CLICKHOUSE_CLIENT --readonly=1 --multiquery --query="set output_format_json_quote_64bit_integers=1 ; select toUInt64(pow(2, 63)) as value format JSON" --server_logs_file=/dev/null 2>&1 | grep -o 'value\|Cannot modify .* setting in readonly mode'
$CLICKHOUSE_CLIENT --readonly=1 --multiquery --query="set output_format_json_quote_64bit_integers=0 ; select toUInt64(pow(2, 63)) as value format JSON" --server_logs_file=/dev/null 2>&1 | grep -o 'value\|Cannot modify .* setting in readonly mode'
$CLICKHOUSE_CLIENT --readonly=1 --multiquery --query="set output_format_json_quote_64bit_integers=1 ; select toUInt64(pow(2, 63)) as value format JSON" --server_logs_file=/dev/null 2>&1 | grep -o -q 'value\|Cannot modify .* setting in readonly mode' && echo "OK" || echo "FAIL"
$CLICKHOUSE_CLIENT --readonly=1 --multiquery --query="set output_format_json_quote_64bit_integers=0 ; select toUInt64(pow(2, 63)) as value format JSON" --server_logs_file=/dev/null 2>&1 | grep -o -q 'value\|Cannot modify .* setting in readonly mode' && echo "OK" || echo "FAIL"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=SELECT+toUInt64(pow(2,+63))+as+value+format+JSON&output_format_json_quote_64bit_integers=1" | grep value
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=SELECT+toUInt64(pow(2,+63))+as+value+format+JSON&output_format_json_quote_64bit_integers=0" | grep value
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=readonly&session_timeout=3600" -d 'SET readonly = 1'
#${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=readonly&session_timeout=3600" -d 'SET readonly = 1'
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=readonly&query=SELECT+toUInt64(pow(2,+63))+as+value+format+JSON&output_format_json_quote_64bit_integers=1" 2>&1 | grep -o -q 'value\|Cannot modify .* setting in readonly mode.' && echo "OK" || echo "FAIL"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=readonly&query=SELECT+toUInt64(pow(2,+63))+as+value+format+JSON&output_format_json_quote_64bit_integers=0" 2>&1 | grep -o -q 'value\|Cannot modify .* setting in readonly mode' && echo "OK" || echo "FAIL"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=readonly&query=SELECT+toUInt64(pow(2,+63))+as+value+format+JSON&output_format_json_quote_64bit_integers=1" 2>&1 | grep -o 'value\|Cannot modify .* setting in readonly mode.'
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=readonly&query=SELECT+toUInt64(pow(2,+63))+as+value+format+JSON&output_format_json_quote_64bit_integers=0" 2>&1 | grep -o 'value\|Cannot modify .* setting in readonly mode'

View File

@ -1,4 +1,5 @@
SET send_logs_level = 'fatal';
SET convert_query_to_cnf = 0;
DROP TABLE IF EXISTS test_00808;
CREATE TABLE test_00808(date Date, id Int8, name String, value Int64, sign Int8) ENGINE = CollapsingMergeTree(sign) ORDER BY (id, date);

View File

@ -1,4 +1,6 @@
SET enable_optimize_predicate_expression = 0;
SET optimize_move_to_prewhere = 1;
SET convert_query_to_cnf = 0;
select * from system.one l cross join system.one r;

View File

@ -1,4 +1,5 @@
SET enable_optimize_predicate_expression = 0;
SET convert_query_to_cnf = 0;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;

View File

@ -1,5 +1,7 @@
-- Tags: shard
SET prefer_localhost_replica = 1;
SELECT count() FROM remote('127.0.0.1,localhos', system.one); -- { serverError 198 }
SELECT count() FROM remote('127.0.0.1|localhos', system.one);

View File

@ -1,4 +1,4 @@
-- Tags: no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-fasttest
-- Tags: no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-fasttest, no-cpu-aarch64
-- Tag no-fasttest: Not sure why fail even in sequential mode. Disabled for now to make some progress.
SET allow_introspection_functions = 1;

View File

@ -1,4 +1,4 @@
-- Tags: no-parallel, no-fasttest
-- Tags: no-parallel, no-fasttest, no-random-settings
SET max_memory_usage = 32000000;
SET join_on_disk_max_files_to_merge = 4;

View File

@ -1,4 +1,4 @@
-- Tags: no-tsan, no-asan, no-msan, no-replicated-database
-- Tags: no-tsan, no-asan, no-msan, no-replicated-database, no-random-settings
-- Tag no-tsan: Fine thresholds on memory usage
-- Tag no-asan: Fine thresholds on memory usage
-- Tag no-msan: Fine thresholds on memory usage
@ -7,6 +7,8 @@
-- sizeof(HLL) is (2^K * 6 / 8)
-- hence max_memory_usage for 100 rows = (96<<10)*100 = 9830400
SET use_uncompressed_cache = 0;
-- HashTable for UInt32 (used until (1<<13) elements), hence 8192 elements
SELECT 'UInt32';
SET max_memory_usage = 4000000;
@ -19,6 +21,8 @@ SELECT 'UInt64';
SET max_memory_usage = 4000000;
SELECT sum(u) FROM (SELECT intDiv(number, 4096) AS k, uniqCombined(reinterpretAsString(number % 4096)) u FROM numbers(4096 * 100) GROUP BY k); -- { serverError 241 }
SET max_memory_usage = 9830400;
SELECT sum(u) FROM (SELECT intDiv(number, 4096) AS k, uniqCombined(reinterpretAsString(number % 4096)) u FROM numbers(4096 * 100) GROUP BY k);
SELECT 'K=16';

View File

@ -7,5 +7,5 @@ select arrayCompact([1,1,2]);
select arrayCompact([1,2,1]);
select arrayCompact([2,1,1]);
select arrayCompact([1,2,2,3,3,3,4,4,4,4,5,5,5,5,5]);
SELECT arrayCompact(x->0, [NULL]);
SELECT toString(arrayCompact(x->0, [NULL]));
SELECT arrayCompact(arrayMap(x->0, [NULL]));
SELECT toString(arrayCompact(arrayMap(x->0, [NULL])));

View File

@ -15,3 +15,6 @@
['0','1','2']
['0','1','2']
['0','1','2']
[(0,0),(3,1),(6,2),(9,0)]
[('0','0'),('3','1'),('6','2'),('9','0')]
[('0',0),('3',1),('6',2),('9',0)]

View File

@ -5,4 +5,7 @@ SELECT arrayCompact([1, 1, NULL, NULL, 2, 2, 2]);
SELECT arrayCompact([1, 1, NULL, NULL, nan, nan, 2, 2, 2]);
SELECT arrayCompact(['hello', '', '', '', 'world', 'world']);
SELECT arrayCompact([[[]], [[], []], [[], []], [[]]]);
SELECT arrayCompact(x -> toString(intDiv(x, 3)), range(number)) FROM numbers(10);
SELECT arrayCompact(arrayMap(x -> toString(intDiv(x, 3)), range(number))) FROM numbers(10);
SELECT arrayCompact(x -> x.2, groupArray((number, intDiv(number, 3) % 3))) FROM numbers(10);
SELECT arrayCompact(x -> x.2, groupArray((toString(number), toString(intDiv(number, 3) % 3)))) FROM numbers(10);
SELECT arrayCompact(x -> x.2, groupArray((toString(number), intDiv(number, 3) % 3))) FROM numbers(10);

View File

@ -1,5 +1,7 @@
-- Tags: replica, distributed
set allow_experimental_parallel_reading_from_replicas=0;
drop table if exists test_max_parallel_replicas_lr;
-- If you wonder why the table is named with "_lr" suffix in this test.

View File

@ -1,5 +1,7 @@
-- Tags: distributed
set allow_experimental_parallel_reading_from_replicas = 0;
drop table if exists sample_final;
create table sample_final (CounterID UInt32, EventDate Date, EventTime DateTime, UserID UInt64, Sign Int8) engine = CollapsingMergeTree(Sign) order by (CounterID, EventDate, intHash32(UserID), EventTime) sample by intHash32(UserID);
insert into sample_final select number / (8192 * 4), toDate('2019-01-01'), toDateTime('2019-01-01 00:00:01') + number, number / (8192 * 2), number % 3 = 1 ? -1 : 1 from numbers(1000000);

View File

@ -1,5 +1,6 @@
SET enable_optimize_predicate_expression = 1;
SET joined_subquery_requires_alias = 0;
SET convert_query_to_cnf = 0;
-- https://github.com/ClickHouse/ClickHouse/issues/3885
-- https://github.com/ClickHouse/ClickHouse/issues/5485

View File

@ -1,3 +1,5 @@
SET convert_query_to_cnf = 0;
DROP TABLE IF EXISTS n;
DROP TABLE IF EXISTS r;

View File

@ -1,4 +1,4 @@
-- Tags: no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-parallel, no-fasttest
-- Tags: no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-parallel, no-fasttest, no-cpu-aarch64
SET allow_introspection_functions = 1;

View File

@ -2,6 +2,8 @@
-- set insert_distributed_sync = 1; -- see https://github.com/ClickHouse/ClickHouse/issues/18971
SET allow_experimental_parallel_reading_from_replicas = 0; -- see https://github.com/ClickHouse/ClickHouse/issues/34525
DROP TABLE IF EXISTS local_01099_a;
DROP TABLE IF EXISTS local_01099_b;
DROP TABLE IF EXISTS distributed_01099_a;

Some files were not shown because too many files have changed in this diff Show More