Merge branch 'ClickHouse:master' into zvonand-globs-small-fix

This commit is contained in:
Andrey Zvonov 2023-09-14 13:49:59 +03:00 committed by GitHub
commit 7dd5dbb2ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 2044 additions and 602 deletions

View File

@ -1,21 +1,15 @@
# docker build -t clickhouse/mysql-java-client .
# MySQL Java client docker container
FROM ubuntu:18.04
FROM openjdk:8-jdk-alpine
RUN apt-get update && \
apt-get install -y software-properties-common build-essential openjdk-8-jdk libmysql-java curl
RUN apk --no-cache add curl
RUN rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
/tmp/* \
RUN apt-get clean
ARG ver=5.1.46
RUN curl -L -o /mysql-connector-java-${ver}.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/${ver}/mysql-connector-java-${ver}.jar
ENV CLASSPATH=$CLASSPATH:/mysql-connector-java-${ver}.jar
ARG ver=8.1.0
RUN curl -L -o /mysql-connector-j-${ver}.jar https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/${ver}/mysql-connector-j-${ver}.jar
ENV CLASSPATH=$CLASSPATH:/mysql-connector-j-${ver}.jar
WORKDIR /jdbc
COPY Test.java Test.java
RUN javac Test.java
COPY PreparedStatementsTest.java PreparedStatementsTest.java
RUN javac Test.java PreparedStatementsTest.java

View File

@ -0,0 +1,193 @@
import com.mysql.cj.MysqlType;
import java.sql.*;
public class PreparedStatementsTest {
public static void main(String[] args) {
int i = 0;
String host = "127.0.0.1";
String port = "9004";
String user = "default";
String password = "";
String database = "default";
while (i < args.length) {
switch (args[i]) {
case "--host":
host = args[++i];
break;
case "--port":
port = args[++i];
break;
case "--user":
user = args[++i];
break;
case "--password":
password = args[++i];
break;
case "--database":
database = args[++i];
break;
default:
i++;
break;
}
}
// useServerPrepStmts uses COM_STMT_PREPARE and COM_STMT_EXECUTE
// instead of COM_QUERY which allows us to test the binary protocol
String jdbcUrl = String.format("jdbc:mysql://%s:%s/%s?useSSL=false&useServerPrepStmts=true",
host, port, database);
try {
Class.forName("com.mysql.cj.jdbc.Driver");
Connection conn = DriverManager.getConnection(jdbcUrl, user, password);
testSimpleDataTypes(conn);
testStringTypes(conn);
testLowCardinalityAndNullableTypes(conn);
testDecimalTypes(conn);
testMiscTypes(conn);
testDateTypes(conn);
testUnusualDateTime64Scales(conn);
testDateTimeTimezones(conn);
conn.close();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
private static void testSimpleDataTypes(Connection conn) throws SQLException {
System.out.println("### testSimpleDataTypes");
ResultSet rs = conn.prepareStatement("SELECT * FROM ps_simple_data_types").executeQuery();
int rowNum = 1;
while (rs.next()) {
System.out.printf("Row #%d\n", rowNum++);
System.out.printf("%s, value: %d\n", getMysqlType(rs, "i8"), rs.getInt("i8"));
System.out.printf("%s, value: %d\n", getMysqlType(rs, "i16"), rs.getInt("i16"));
System.out.printf("%s, value: %d\n", getMysqlType(rs, "i32"), rs.getInt("i32"));
System.out.printf("%s, value: %d\n", getMysqlType(rs, "i64"), rs.getLong("i64"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "i128"), rs.getString("i128"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "i256"), rs.getString("i256"));
System.out.printf("%s, value: %d\n", getMysqlType(rs, "ui8"), rs.getInt("ui8"));
System.out.printf("%s, value: %d\n", getMysqlType(rs, "ui16"), rs.getInt("ui16"));
System.out.printf("%s, value: %d\n", getMysqlType(rs, "ui32"), rs.getLong("ui32"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "ui64"), rs.getString("ui64"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "ui128"), rs.getString("ui128"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "ui256"), rs.getString("ui256"));
System.out.printf("%s, value: %f\n", getMysqlType(rs, "f32"), rs.getFloat("f32"));
System.out.printf("%s, value: %f\n", getMysqlType(rs, "f64"), rs.getFloat("f64"));
System.out.printf("%s, value: %b\n", getMysqlType(rs, "b"), rs.getBoolean("b"));
}
System.out.println();
}
private static void testStringTypes(Connection conn) throws SQLException {
System.out.println("### testStringTypes");
ResultSet rs = conn.prepareStatement("SELECT * FROM ps_string_types").executeQuery();
int rowNum = 1;
while (rs.next()) {
System.out.printf("Row #%d\n", rowNum++);
System.out.printf("%s, value: %s\n", getMysqlType(rs, "s"), rs.getString("s"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "sn"), rs.getString("sn"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "lc"), rs.getString("lc"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "nlc"), rs.getString("nlc"));
}
System.out.println();
}
private static void testLowCardinalityAndNullableTypes(Connection conn) throws SQLException {
System.out.println("### testLowCardinalityAndNullableTypes");
ResultSet rs = conn.prepareStatement("SELECT * FROM ps_low_cardinality_and_nullable_types").executeQuery();
int rowNum = 1;
while (rs.next()) {
System.out.printf("Row #%d\n", rowNum++);
System.out.printf("%s, value: %s\n", getMysqlType(rs, "ilc"), rs.getInt("ilc"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dlc"), rs.getDate("dlc"));
// NULL int is represented as zero
System.out.printf("%s, value: %s\n", getMysqlType(rs, "ni"), rs.getInt("ni"));
}
System.out.println();
}
private static void testDecimalTypes(Connection conn) throws SQLException {
System.out.println("### testDecimalTypes");
ResultSet rs = conn.prepareStatement("SELECT * FROM ps_decimal_types").executeQuery();
int rowNum = 1;
while (rs.next()) {
System.out.printf("Row #%d\n", rowNum++);
System.out.printf("%s, value: %s\n", getMysqlType(rs, "d32"), rs.getBigDecimal("d32").toPlainString());
System.out.printf("%s, value: %s\n", getMysqlType(rs, "d64"), rs.getBigDecimal("d64").toPlainString());
System.out.printf("%s, value: %s\n", getMysqlType(rs, "d128_native"),
rs.getBigDecimal("d128_native").toPlainString());
System.out.printf("%s, value: %s\n", getMysqlType(rs, "d128_text"), rs.getString("d128_text"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "d256"), rs.getString("d256"));
}
System.out.println();
}
private static void testDateTypes(Connection conn) throws SQLException {
System.out.println("### testDateTypes");
ResultSet rs = conn.prepareStatement("SELECT * FROM ps_date_types").executeQuery();
int rowNum = 1;
while (rs.next()) {
System.out.printf("Row #%d\n", rowNum++);
System.out.printf("%s, value: %s\n", getMysqlType(rs, "d"), rs.getDate("d"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "d32"), rs.getDate("d32"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt"), rs.getTimestamp("dt"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_3"), rs.getTimestamp("dt64_3"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_6"), rs.getTimestamp("dt64_6"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_9"), rs.getTimestamp("dt64_9"));
}
System.out.println();
}
private static void testUnusualDateTime64Scales(Connection conn) throws SQLException {
System.out.println("### testUnusualDateTime64Scales");
ResultSet rs = conn.prepareStatement("SELECT * FROM ps_unusual_datetime64_scales").executeQuery();
int rowNum = 1;
while (rs.next()) {
System.out.printf("Row #%d\n", rowNum++);
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_0"), rs.getTimestamp("dt64_0"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_1"), rs.getTimestamp("dt64_1"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_2"), rs.getTimestamp("dt64_2"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_4"), rs.getTimestamp("dt64_4"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_5"), rs.getTimestamp("dt64_5"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_7"), rs.getTimestamp("dt64_7"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_8"), rs.getTimestamp("dt64_8"));
}
System.out.println();
}
private static void testDateTimeTimezones(Connection conn) throws SQLException {
System.out.println("### testDateTimeTimezones");
ResultSet rs = conn.prepareStatement("SELECT * FROM ps_datetime_timezones").executeQuery();
int rowNum = 1;
while (rs.next()) {
System.out.printf("Row #%d\n", rowNum++);
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt"), rs.getTimestamp("dt"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "dt64_3"), rs.getTimestamp("dt64_3"));
}
System.out.println();
}
private static void testMiscTypes(Connection conn) throws SQLException {
System.out.println("### testMiscTypes");
ResultSet rs = conn.prepareStatement("SELECT * FROM ps_misc_types").executeQuery();
int rowNum = 1;
while (rs.next()) {
System.out.printf("Row #%d\n", rowNum++);
System.out.printf("%s, value: %s\n", getMysqlType(rs, "a"), rs.getString("a"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "u"), rs.getString("u"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "t"), rs.getString("t"));
System.out.printf("%s, value: %s\n", getMysqlType(rs, "m"), rs.getString("m"));
}
System.out.println();
}
private static String getMysqlType(ResultSet rs, String columnLabel) throws SQLException {
ResultSetMetaData meta = rs.getMetaData();
return String.format("%s type is %s", columnLabel,
MysqlType.getByJdbcType(meta.getColumnType(rs.findColumn(columnLabel))));
}
}

View File

@ -46,6 +46,7 @@ class JavaConnectorTest {
Connection conn = null;
Statement stmt = null;
try {
Class.forName("com.mysql.cj.jdbc.Driver");
conn = DriverManager.getConnection(jdbcUrl, user, password);
stmt = conn.createStatement();
stmt.executeUpdate(CREATE_TABLE_SQL);
@ -69,7 +70,7 @@ class JavaConnectorTest {
stmt.close();
conn.close();
} catch (SQLException e) {
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}

View File

@ -3,4 +3,4 @@ services:
java1:
image: clickhouse/mysql-java-client:${DOCKER_MYSQL_JAVA_CLIENT_TAG:-latest}
# to keep container running
command: sleep infinity
command: sleep 1d

View File

@ -60,6 +60,12 @@ install_packages previous_release_package_folder
# available for dump via clickhouse-local
configure
# async_replication setting doesn't exist on some older versions
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
| sed "/<async_replication>1<\/async_replication>/d" \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
@ -82,6 +88,12 @@ sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
# async_replication setting doesn't exist on some older versions
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
| sed "/<async_replication>1<\/async_replication>/d" \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
# But we still need default disk because some tables loaded only into it
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml \
| sed "s|<main><disk>s3</disk></main>|<main><disk>s3</disk></main><default><disk>default</disk></default>|" \

View File

@ -208,7 +208,7 @@ The optional keyword `FULL` causes the output to include the collation, comment
The statement produces a result table with the following structure:
- field - The name of the column (String)
- type - The column data type (String)
- null - If the column data type is Nullable (UInt8)
- null - `YES` if the column data type is Nullable, `NO` otherwise (String)
- key - `PRI` if the column is part of the primary key, `SOR` if the column is part of the sorting key, empty otherwise (String)
- default - Default expression of the column if it is of type `ALIAS`, `DEFAULT`, or `MATERIALIZED`, otherwise `NULL`. (Nullable(String))
- extra - Additional information, currently unused (String)

View File

@ -1393,7 +1393,7 @@ try
const auto interserver_listen_hosts = getInterserverListenHosts(config());
const auto listen_try = getListenTry(config());
if (config().has("keeper_server"))
if (config().has("keeper_server.server_id"))
{
#if USE_NURAFT
//// If we don't have configured connection probably someone trying to use clickhouse-server instead

View File

@ -1466,7 +1466,7 @@ void validateZooKeeperConfig(const Poco::Util::AbstractConfiguration & config)
bool hasZooKeeperConfig(const Poco::Util::AbstractConfiguration & config)
{
return config.has("zookeeper") || config.has("keeper") || (config.has("keeper_server") && config.getBool("keeper_server.use_cluster", true));
return config.has("zookeeper") || config.has("keeper") || (config.has("keeper_server.raft_configuration") && config.getBool("keeper_server.use_cluster", true));
}
String getZooKeeperConfigName(const Poco::Util::AbstractConfiguration & config)
@ -1477,7 +1477,7 @@ String getZooKeeperConfigName(const Poco::Util::AbstractConfiguration & config)
if (config.has("keeper"))
return "keeper";
if (config.has("keeper_server") && config.getBool("keeper_server.use_cluster", true))
if (config.has("keeper_server.raft_configuration") && config.getBool("keeper_server.use_cluster", true))
return "keeper_server";
throw DB::Exception(DB::ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no Zookeeper configuration in server config");

View File

@ -586,13 +586,15 @@ private:
std::unique_ptr<ReadBuffer> read_buf;
};
Changelog::Changelog(Poco::Logger * log_, LogFileSettings log_file_settings, KeeperContextPtr keeper_context_)
Changelog::Changelog(
Poco::Logger * log_, LogFileSettings log_file_settings, FlushSettings flush_settings_, KeeperContextPtr keeper_context_)
: changelogs_detached_dir("detached")
, rotate_interval(log_file_settings.rotate_interval)
, log(log_)
, write_operations(std::numeric_limits<size_t>::max())
, append_completion_queue(std::numeric_limits<size_t>::max())
, keeper_context(std::move(keeper_context_))
, flush_settings(flush_settings_)
{
if (auto latest_log_disk = getLatestLogDisk();
log_file_settings.force_sync && dynamic_cast<const DiskLocal *>(latest_log_disk.get()) == nullptr)
@ -1014,8 +1016,65 @@ void Changelog::writeThread()
{
WriteOperation write_operation;
bool batch_append_ok = true;
while (write_operations.pop(write_operation))
size_t pending_appends = 0;
bool try_batch_flush = false;
const auto flush_logs = [&](const auto & flush)
{
LOG_TEST(log, "Flushing {} logs", pending_appends);
{
std::lock_guard writer_lock(writer_mutex);
current_writer->flush();
}
{
std::lock_guard lock{durable_idx_mutex};
last_durable_idx = flush.index;
}
pending_appends = 0;
};
const auto notify_append_completion = [&]
{
durable_idx_cv.notify_all();
// we need to call completion callback in another thread because it takes a global lock for the NuRaft server
// NuRaft will in some places wait for flush to be done while having the same global lock leading to deadlock
// -> future write operations are blocked by flush that cannot be completed because it cannot take NuRaft lock
// -> NuRaft won't leave lock until its flush is done
if (!append_completion_queue.push(batch_append_ok))
LOG_WARNING(log, "Changelog is shut down");
};
/// NuRaft writes a batch of request by first calling multiple store requests, i.e. AppendLog
/// finished by a flush request
/// We assume that after some number of appends, we always get flush request
while (true)
{
if (try_batch_flush)
{
try_batch_flush = false;
/// we have Flush request stored in write operation
/// but we try to get new append operations
/// if there are none, we apply the currently set Flush
chassert(std::holds_alternative<Flush>(write_operation));
if (!write_operations.tryPop(write_operation))
{
chassert(batch_append_ok);
const auto & flush = std::get<Flush>(write_operation);
flush_logs(flush);
notify_append_completion();
if (!write_operations.pop(write_operation))
break;
}
}
else if (!write_operations.pop(write_operation))
{
break;
}
assert(initialized);
if (auto * append_log = std::get_if<AppendLog>(&write_operation))
@ -1027,6 +1086,7 @@ void Changelog::writeThread()
assert(current_writer);
batch_append_ok = current_writer->appendRecord(buildRecord(append_log->index, append_log->log_entry));
++pending_appends;
}
else
{
@ -1034,30 +1094,21 @@ void Changelog::writeThread()
if (batch_append_ok)
{
/// we can try batching more logs for flush
if (pending_appends < flush_settings.max_flush_batch_size)
{
std::lock_guard writer_lock(writer_mutex);
current_writer->flush();
}
{
std::lock_guard lock{durable_idx_mutex};
last_durable_idx = flush.index;
try_batch_flush = true;
continue;
}
/// we need to flush because we have maximum allowed pending records
flush_logs(flush);
}
else
{
std::lock_guard lock{durable_idx_mutex};
*flush.failed = true;
}
durable_idx_cv.notify_all();
// we need to call completion callback in another thread because it takes a global lock for the NuRaft server
// NuRaft will in some places wait for flush to be done while having the same global lock leading to deadlock
// -> future write operations are blocked by flush that cannot be completed because it cannot take NuRaft lock
// -> NuRaft won't leave lock until its flush is done
if (!append_completion_queue.push(batch_append_ok))
LOG_WARNING(log, "Changelog is shut down");
notify_append_completion();
batch_append_ok = true;
}
}

View File

@ -82,6 +82,11 @@ struct LogFileSettings
uint64_t overallocate_size = 0;
};
struct FlushSettings
{
uint64_t max_flush_batch_size = 1000;
};
/// Simplest changelog with files rotation.
/// No compression, no metadata, just entries with headers one by one.
/// Able to read broken files/entries and discard them. Not thread safe.
@ -91,6 +96,7 @@ public:
Changelog(
Poco::Logger * log_,
LogFileSettings log_file_settings,
FlushSettings flush_settings,
KeeperContextPtr keeper_context_);
Changelog(Changelog &&) = delete;
@ -229,6 +235,8 @@ private:
KeeperContextPtr keeper_context;
const FlushSettings flush_settings;
bool initialized = false;
};

View File

@ -134,6 +134,8 @@ void KeeperConfigurationAndSettings::dump(WriteBufferFromOwnString & buf) const
write_int(coordination_settings->max_requests_batch_size);
writeText("max_requests_batch_bytes_size=", buf);
write_int(coordination_settings->max_requests_batch_bytes_size);
writeText("max_flush_batch_size=", buf);
write_int(coordination_settings->max_flush_batch_size);
writeText("max_request_queue_size=", buf);
write_int(coordination_settings->max_request_queue_size);
writeText("max_requests_quick_batch_size=", buf);
@ -152,6 +154,9 @@ void KeeperConfigurationAndSettings::dump(WriteBufferFromOwnString & buf) const
writeText("raft_limits_reconnect_limit=", buf);
write_int(static_cast<uint64_t>(coordination_settings->raft_limits_reconnect_limit));
writeText("async_replication=", buf);
write_bool(coordination_settings->async_replication);
}
KeeperConfigurationAndSettingsPtr

View File

@ -38,8 +38,9 @@ struct Settings;
M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \
M(UInt64, max_request_queue_size, 100000, "Maximum number of request that can be in queue for processing", 0) \
M(UInt64, max_requests_batch_size, 100, "Max size of batch of requests that can be sent to RAFT", 0) \
M(UInt64, max_requests_batch_size, 1000, "Max size of batch of requests that can be sent to RAFT", 0) \
M(UInt64, max_requests_batch_bytes_size, 100*1024, "Max size in bytes of batch of requests that can be sent to RAFT", 0) \
M(UInt64, max_flush_batch_size, 1000, "Max size of batch of requests that can be flushed together", 0) \
M(UInt64, max_requests_quick_batch_size, 100, "Max size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in queue" , 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \
@ -49,7 +50,8 @@ struct Settings;
M(UInt64, max_log_file_size, 50 * 1024 * 1024, "Max size of the Raft log file. If possible, each created log file will preallocate this amount of bytes on disk. Set to 0 to disable the limit", 0) \
M(UInt64, log_file_overallocate_size, 50 * 1024 * 1024, "If max_log_file_size is not set to 0, this value will be added to it for preallocating bytes on disk. If a log record is larger than this value, it could lead to uncaught out-of-space issues so a larger value is preferred", 0) \
M(UInt64, min_request_size_for_cache, 50 * 1024, "Minimal size of the request to cache the deserialization result. Caching can have negative effect on latency for smaller requests, set to 0 to disable", 0) \
M(UInt64, raft_limits_reconnect_limit, 50, "If connection to a peer is silent longer than this limit * (multiplied by heartbeat interval), we re-establish the connection.", 0)
M(UInt64, raft_limits_reconnect_limit, 50, "If connection to a peer is silent longer than this limit * (multiplied by heartbeat interval), we re-establish the connection.", 0) \
M(Bool, async_replication, false, "Enable async replication. All write and read guarantees are preserved while better performance is achieved. Settings is disabled by default to not break backwards compatibility.", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -12,6 +12,7 @@
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <atomic>
#include <future>
#include <chrono>
#include <filesystem>
@ -73,7 +74,6 @@ void KeeperDispatcher::requestThread()
auto coordination_settings = configuration_and_settings->coordination_settings;
uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds();
uint64_t max_batch_size = coordination_settings->max_requests_batch_size;
uint64_t max_batch_bytes_size = coordination_settings->max_requests_batch_bytes_size;
/// The code below do a very simple thing: batch all write (quorum) requests into vector until
@ -136,12 +136,9 @@ void KeeperDispatcher::requestThread()
return false;
};
/// TODO: Deprecate max_requests_quick_batch_size and use only max_requests_batch_size and max_requests_batch_bytes_size
size_t max_quick_batch_size = coordination_settings->max_requests_quick_batch_size;
while (!shutdown_called && !has_read_request &&
!has_reconfig_request &&
current_batch.size() < max_quick_batch_size && current_batch_bytes_size < max_batch_bytes_size &&
try_get_request())
size_t max_batch_size = coordination_settings->max_requests_batch_size;
while (!shutdown_called && current_batch.size() < max_batch_size && !has_reconfig_request
&& current_batch_bytes_size < max_batch_bytes_size && try_get_request())
;
const auto prev_result_done = [&]
@ -152,10 +149,9 @@ void KeeperDispatcher::requestThread()
};
/// Waiting until previous append will be successful, or batch is big enough
while (!shutdown_called && !has_read_request &&
!has_reconfig_request && !prev_result_done() &&
current_batch.size() <= max_batch_size
&& current_batch_bytes_size < max_batch_bytes_size)
while (!shutdown_called && !has_reconfig_request &&
!prev_result_done() && current_batch.size() <= max_batch_size
&& current_batch_bytes_size < max_batch_bytes_size)
{
try_get_request();
}
@ -166,9 +162,10 @@ void KeeperDispatcher::requestThread()
if (shutdown_called)
break;
nuraft::ptr<nuraft::buffer> result_buf = nullptr;
/// Forcefully process all previous pending requests
if (prev_result)
forceWaitAndProcessResult(prev_result, prev_batch);
result_buf = forceWaitAndProcessResult(prev_result, prev_batch);
/// Process collected write requests batch
if (!current_batch.empty())
@ -177,13 +174,7 @@ void KeeperDispatcher::requestThread()
auto result = server->putRequestBatch(current_batch);
if (result)
{
/// If we will execute read or reconfig next, we have to process result now
if (has_read_request || has_reconfig_request)
forceWaitAndProcessResult(result, current_batch);
}
else
if (!result)
{
addErrorResponses(current_batch, Coordination::Error::ZCONNECTIONLOSS);
current_batch.clear();
@ -194,6 +185,34 @@ void KeeperDispatcher::requestThread()
prev_result = result;
}
/// If we will execute read or reconfig next, we have to process result now
if (has_read_request || has_reconfig_request)
{
if (prev_result)
result_buf = forceWaitAndProcessResult(prev_result, current_batch);
/// In case of older version or disabled async replication, result buf will be set to value of `commit` function
/// which always returns nullptr
/// in that case we don't have to do manual wait because are already sure that the batch was committed when we get
/// the result back
/// otherwise, we need to manually wait until the batch is committed
if (result_buf)
{
nuraft::buffer_serializer bs(result_buf);
auto log_idx = bs.get_u64();
/// we will wake up this thread on each commit so we need to run it in loop until the last request of batch is committed
while (true)
{
auto current_last_committed_idx = our_last_committed_log_idx.load(std::memory_order_relaxed);
if (current_last_committed_idx >= log_idx)
break;
our_last_committed_log_idx.wait(current_last_committed_idx);
}
}
}
if (has_reconfig_request)
server->getKeeperStateMachine()->reconfigure(request);
@ -360,28 +379,33 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
snapshots_queue,
keeper_context,
snapshot_s3,
[this](const KeeperStorage::RequestForSession & request_for_session)
[this](uint64_t log_idx, const KeeperStorage::RequestForSession & request_for_session)
{
/// check if we have queue of read requests depending on this request to be committed
std::lock_guard lock(read_request_queue_mutex);
if (auto it = read_request_queue.find(request_for_session.session_id); it != read_request_queue.end())
{
auto & xid_to_request_queue = it->second;
if (auto request_queue_it = xid_to_request_queue.find(request_for_session.request->xid);
request_queue_it != xid_to_request_queue.end())
/// check if we have queue of read requests depending on this request to be committed
std::lock_guard lock(read_request_queue_mutex);
if (auto it = read_request_queue.find(request_for_session.session_id); it != read_request_queue.end())
{
for (const auto & read_request : request_queue_it->second)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(read_request);
else
addErrorResponses({read_request}, Coordination::Error::ZCONNECTIONLOSS);
}
auto & xid_to_request_queue = it->second;
xid_to_request_queue.erase(request_queue_it);
if (auto request_queue_it = xid_to_request_queue.find(request_for_session.request->xid);
request_queue_it != xid_to_request_queue.end())
{
for (const auto & read_request : request_queue_it->second)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(read_request);
else
addErrorResponses({read_request}, Coordination::Error::ZCONNECTIONLOSS);
}
xid_to_request_queue.erase(request_queue_it);
}
}
}
our_last_committed_log_idx.store(log_idx, std::memory_order_relaxed);
our_last_committed_log_idx.notify_all();
});
try
@ -638,7 +662,7 @@ void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSession
}
}
void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions)
nuraft::ptr<nuraft::buffer> KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions)
{
if (!result->has_result())
result->get();
@ -649,8 +673,11 @@ void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, Keep
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
auto result_buf = result->get();
result = nullptr;
requests_for_sessions.clear();
return result_buf;
}
int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)

View File

@ -102,11 +102,13 @@ private:
/// Forcefully wait for result and sets errors if something when wrong.
/// Clears both arguments
void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions);
nuraft::ptr<nuraft::buffer> forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions);
public:
std::mutex read_request_queue_mutex;
std::atomic<uint64_t> our_last_committed_log_idx = 0;
/// queue of read requests that can be processed after a request with specific session ID and XID is committed
std::unordered_map<int64_t, std::unordered_map<Coordination::XID, KeeperStorage::RequestsForSessions>> read_request_queue;

View File

@ -6,9 +6,8 @@
namespace DB
{
KeeperLogStore::KeeperLogStore(LogFileSettings log_file_settings, KeeperContextPtr keeper_context)
: log(&Poco::Logger::get("KeeperLogStore"))
, changelog(log, log_file_settings, keeper_context)
KeeperLogStore::KeeperLogStore(LogFileSettings log_file_settings, FlushSettings flush_settings, KeeperContextPtr keeper_context)
: log(&Poco::Logger::get("KeeperLogStore")), changelog(log, log_file_settings, flush_settings, keeper_context)
{
if (log_file_settings.force_sync)
LOG_INFO(log, "force_sync enabled");

View File

@ -14,7 +14,7 @@ namespace DB
class KeeperLogStore : public nuraft::log_store
{
public:
KeeperLogStore(LogFileSettings log_file_settings, KeeperContextPtr keeper_context);
KeeperLogStore(LogFileSettings log_file_settings, FlushSettings flush_settings, KeeperContextPtr keeper_context);
/// Read log storage from filesystem starting from last_commited_log_index
void init(uint64_t last_commited_log_index, uint64_t logs_to_keep);

View File

@ -208,28 +208,33 @@ void KeeperServer::loadLatestConfig()
{
auto latest_snapshot_config = state_machine->getClusterConfig();
auto latest_log_store_config = state_manager->getLatestConfigFromLogStore();
auto async_replication = coordination_settings->async_replication;
if (latest_snapshot_config && latest_log_store_config)
{
if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx())
{
LOG_INFO(log, "Will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
latest_snapshot_config->set_async_replication(async_replication);
state_manager->save_config(*latest_snapshot_config);
}
else
{
LOG_INFO(log, "Will use config from log store with log index {}", latest_snapshot_config->get_log_idx());
LOG_INFO(log, "Will use config from log store with log index {}", latest_log_store_config->get_log_idx());
latest_log_store_config->set_async_replication(async_replication);
state_manager->save_config(*latest_log_store_config);
}
}
else if (latest_snapshot_config)
{
LOG_INFO(log, "No config in log store, will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
latest_snapshot_config->set_async_replication(async_replication);
state_manager->save_config(*latest_snapshot_config);
}
else if (latest_log_store_config)
{
LOG_INFO(log, "No config in snapshot, will use config from log store with log index {}", latest_log_store_config->get_log_idx());
latest_log_store_config->set_async_replication(async_replication);
state_manager->save_config(*latest_log_store_config);
}
else
@ -417,7 +422,7 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
loadLatestConfig();
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
last_local_config = state_manager->parseServersConfiguration(config, true, coordination_settings->async_replication).cluster_config;
launchRaftServer(config, enable_ipv6);
@ -841,12 +846,12 @@ bool KeeperServer::applyConfigUpdate(const ClusterUpdateAction & action)
ClusterUpdateActions KeeperServer::getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
{
auto diff = state_manager->getRaftConfigurationDiff(config);
auto diff = state_manager->getRaftConfigurationDiff(config, coordination_settings);
if (!diff.empty())
{
std::lock_guard lock{server_write_mutex};
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
last_local_config = state_manager->parseServersConfiguration(config, true, coordination_settings->async_replication).cluster_config;
}
return diff;

View File

@ -167,7 +167,10 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nur
request_for_session->zxid = log_idx;
preprocess(*request_for_session);
return nullptr;
auto result = nuraft::buffer::alloc(sizeof(log_idx));
nuraft::buffer_serializer ss(result);
ss.put_u64(log_idx);
return result;
}
std::shared_ptr<KeeperStorage::RequestForSession> KeeperStateMachine::parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version)
@ -433,7 +436,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
last_committed_idx = log_idx;
if (commit_callback)
commit_callback(*request_for_session);
commit_callback(log_idx, *request_for_session);
return nullptr;
}

View File

@ -20,7 +20,7 @@ using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;
class KeeperStateMachine : public nuraft::state_machine
{
public:
using CommitCallback = std::function<void(const KeeperStorage::RequestForSession &)>;
using CommitCallback = std::function<void(uint64_t, const KeeperStorage::RequestForSession &)>;
KeeperStateMachine(
ResponsesQueue & responses_queue_,

View File

@ -10,6 +10,7 @@
#include <Common/getMultipleKeysFromConfig.h>
#include <Disks/DiskLocal.h>
#include <Common/logger_useful.h>
#include "Coordination/CoordinationSettings.h"
namespace DB
{
@ -74,7 +75,7 @@ std::unordered_map<UInt64, std::string> getClientPorts(const Poco::Util::Abstrac
/// 4. No duplicate IDs
/// 5. Our ID present in hostnames list
KeeperStateManager::KeeperConfigurationWrapper
KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const
KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us, bool enable_async_replication) const
{
const bool hostname_checks_enabled = config.getBool(config_prefix + ".hostname_checks_enabled", true);
@ -184,6 +185,8 @@ KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfigur
total_servers++;
}
result.cluster_config->set_async_replication(enable_async_replication);
if (!result.config && !allow_without_us)
throw Exception(ErrorCodes::RAFT_ERROR, "Our server id {} not found in raft_configuration section", my_server_id);
@ -220,6 +223,7 @@ KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host,
, secure(false)
, log_store(nuraft::cs_new<KeeperLogStore>(
LogFileSettings{.force_sync = false, .compress_logs = false, .rotate_interval = 5000},
FlushSettings{},
keeper_context_))
, server_state_file_name("state")
, keeper_context(keeper_context_)
@ -242,15 +246,18 @@ KeeperStateManager::KeeperStateManager(
: my_server_id(my_server_id_)
, secure(config.getBool(config_prefix_ + ".raft_configuration.secure", false))
, config_prefix(config_prefix_)
, configuration_wrapper(parseServersConfiguration(config, false))
, configuration_wrapper(parseServersConfiguration(config, false, coordination_settings->async_replication))
, log_store(nuraft::cs_new<KeeperLogStore>(
LogFileSettings
{
.force_sync = coordination_settings->force_sync,
.compress_logs = coordination_settings->compress_logs,
.rotate_interval = coordination_settings->rotate_log_storage_interval,
.max_size = coordination_settings->max_log_file_size,
.overallocate_size = coordination_settings->log_file_overallocate_size
.force_sync = coordination_settings->force_sync,
.compress_logs = coordination_settings->compress_logs,
.rotate_interval = coordination_settings->rotate_log_storage_interval,
.max_size = coordination_settings->max_log_file_size,
.overallocate_size = coordination_settings->log_file_overallocate_size},
FlushSettings
{
.max_flush_batch_size = coordination_settings->max_flush_batch_size,
},
keeper_context_))
, server_state_file_name(server_state_file_name_)
@ -451,9 +458,10 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
return nullptr;
}
ClusterUpdateActions KeeperStateManager::getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const
ClusterUpdateActions KeeperStateManager::getRaftConfigurationDiff(
const Poco::Util::AbstractConfiguration & config, const CoordinationSettingsPtr & coordination_settings) const
{
auto new_configuration_wrapper = parseServersConfiguration(config, true);
auto new_configuration_wrapper = parseServersConfiguration(config, true, coordination_settings->async_replication);
std::unordered_map<int, KeeperServerConfigPtr> new_ids, old_ids;
for (const auto & new_server : new_configuration_wrapper.cluster_config->get_servers())

View File

@ -93,7 +93,7 @@ public:
ClusterConfigPtr getLatestConfigFromLogStore() const;
// TODO (myrrc) This should be removed once "reconfig" is stabilized
ClusterUpdateActions getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const;
ClusterUpdateActions getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config, const CoordinationSettingsPtr & coordination_settings) const;
private:
const String & getOldServerStatePath();
@ -131,7 +131,7 @@ private:
public:
/// Parse configuration from xml config.
KeeperConfigurationWrapper parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const;
KeeperConfigurationWrapper parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us, bool enable_async_replication) const;
};
}

View File

@ -298,7 +298,9 @@ TEST_P(CoordinationTest, ChangelogTestSimple)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
auto entry = getLogEntry("hello world", 77);
changelog.append(entry);
@ -328,7 +330,9 @@ TEST_P(CoordinationTest, ChangelogTestFile)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
auto entry = getLogEntry("hello world", 77);
changelog.append(entry);
@ -360,7 +364,9 @@ TEST_P(CoordinationTest, ChangelogReadWrite)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
@ -375,7 +381,9 @@ TEST_P(CoordinationTest, ChangelogReadWrite)
waitDurableLogs(changelog);
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 10);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term());
@ -398,7 +406,9 @@ TEST_P(CoordinationTest, ChangelogWriteAt)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -421,7 +431,9 @@ TEST_P(CoordinationTest, ChangelogWriteAt)
EXPECT_EQ(changelog.next_slot(), 8);
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), changelog.size());
@ -438,7 +450,9 @@ TEST_P(CoordinationTest, ChangelogTestAppendAfterRead)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 7; ++i)
{
@ -455,7 +469,9 @@ TEST_P(CoordinationTest, ChangelogTestAppendAfterRead)
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 7);
@ -520,7 +536,9 @@ TEST_P(CoordinationTest, ChangelogTestCompaction)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 3; ++i)
@ -570,7 +588,9 @@ TEST_P(CoordinationTest, ChangelogTestCompaction)
EXPECT_EQ(changelog.last_entry()->get_term(), 60);
/// And we able to read it
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(7, 0);
EXPECT_EQ(changelog_reader.size(), 1);
@ -586,7 +606,9 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperations)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -602,7 +624,9 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperations)
auto entries = changelog.pack(1, 5);
DB::KeeperLogStore apply_changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
apply_changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
@ -639,7 +663,9 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperationsEmpty)
nuraft::ptr<nuraft::buffer> entries;
{
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -658,7 +684,9 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperationsEmpty)
ChangelogDirTest test1("./logs1");
setLogDirectory("./logs1");
DB::KeeperLogStore changelog_new(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog_new.init(1, 0);
EXPECT_EQ(changelog_new.size(), 0);
@ -681,7 +709,9 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperationsEmpty)
EXPECT_EQ(changelog_new.next_slot(), 11);
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(5, 0);
}
@ -693,7 +723,9 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtPreviousFile)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 33; ++i)
@ -735,7 +767,9 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtPreviousFile)
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
DB::KeeperLogStore changelog_read(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog_read.init(1, 0);
EXPECT_EQ(changelog_read.size(), 7);
EXPECT_EQ(changelog_read.start_index(), 1);
@ -750,7 +784,9 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtFileBorder)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 33; ++i)
@ -792,7 +828,9 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtFileBorder)
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
DB::KeeperLogStore changelog_read(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog_read.init(1, 0);
EXPECT_EQ(changelog_read.size(), 11);
EXPECT_EQ(changelog_read.start_index(), 1);
@ -807,7 +845,9 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtAllFiles)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 33; ++i)
{
@ -855,7 +895,9 @@ TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -877,7 +919,9 @@ TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead)
EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin" + params.extension));
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(1, 0);
auto entry = getLogEntry("36_hello_world", 360);
@ -924,7 +968,9 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
setLogDirectory(log_folder);
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -949,7 +995,9 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
plain_buf.truncate(0);
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(1, 0);
changelog_reader.end_of_append_batch(0, 0);
@ -983,7 +1031,9 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
assertBrokenLogRemoved(log_folder, "changelog_31_35.bin" + params.extension);
DB::KeeperLogStore changelog_reader2(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog_reader2.init(1, 0);
EXPECT_EQ(changelog_reader2.size(), 11);
EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777);
@ -996,7 +1046,9 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -1015,7 +1067,9 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
plain_buf.truncate(30);
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 0);
@ -1031,7 +1085,9 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
DB::KeeperLogStore changelog_reader2(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1},
DB::FlushSettings(),
keeper_context);
changelog_reader2.init(1, 0);
EXPECT_EQ(changelog_reader2.size(), 1);
EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777);
@ -1044,7 +1100,9 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -1061,7 +1119,9 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles)
fs::remove("./logs/changelog_1_20.bin" + params.extension);
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20},
DB::FlushSettings(),
keeper_context);
/// It should print error message, but still able to start
changelog_reader.init(5, 0);
assertBrokenLogRemoved("./logs", "changelog_21_40.bin" + params.extension);
@ -1074,7 +1134,9 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles2)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 10}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 10},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -1095,7 +1157,9 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles2)
fs::remove("./logs/changelog_21_30.bin" + params.extension);
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 10}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 10},
DB::FlushSettings(),
keeper_context);
/// It should print error message, but still able to start
changelog_reader.init(5, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_10.bin" + params.extension));
@ -1555,6 +1619,7 @@ void testLogAndStateMachine(
DB::KeeperLogStore changelog(
DB::LogFileSettings{
.force_sync = true, .compress_logs = enable_compression, .rotate_interval = settings->rotate_log_storage_interval},
DB::FlushSettings(),
keeper_context);
changelog.init(state_machine->last_commit_index() + 1, settings->reserved_log_items);
for (size_t i = 1; i < total_logs + 1; ++i)
@ -1599,6 +1664,7 @@ void testLogAndStateMachine(
DB::KeeperLogStore restore_changelog(
DB::LogFileSettings{
.force_sync = true, .compress_logs = enable_compression, .rotate_interval = settings->rotate_log_storage_interval},
DB::FlushSettings(),
keeper_context);
restore_changelog.init(restore_machine->last_commit_index() + 1, settings->reserved_log_items);
@ -1851,7 +1917,9 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
setLogDirectory("./logs");
{
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(0, 3);
for (size_t i = 1; i < 55; ++i)
@ -1870,7 +1938,9 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin" + params.extension));
DB::KeeperLogStore changelog_1(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 10}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 10},
DB::FlushSettings(),
keeper_context);
changelog_1.init(0, 50);
for (size_t i = 0; i < 55; ++i)
{
@ -1887,7 +1957,9 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin" + params.extension));
DB::KeeperLogStore changelog_2(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 7}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 7},
DB::FlushSettings(),
keeper_context);
changelog_2.init(98, 55);
for (size_t i = 0; i < 17; ++i)
@ -1911,7 +1983,9 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin" + params.extension));
DB::KeeperLogStore changelog_3(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
DB::FlushSettings(),
keeper_context);
changelog_3.init(116, 3);
for (size_t i = 0; i < 17; ++i)
{
@ -1961,7 +2035,9 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite)
ChangelogDirTest logs("./logs");
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(0, 3);
for (size_t i = 1; i < 55; ++i)
@ -1976,7 +2052,9 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite)
waitDurableLogs(changelog);
DB::KeeperLogStore changelog1(
DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog1.init(0, 3);
for (size_t i = 55; i < 70; ++i)
{
@ -1990,7 +2068,9 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite)
waitDurableLogs(changelog1);
DB::KeeperLogStore changelog2(
DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog2.init(0, 3);
for (size_t i = 70; i < 80; ++i)
{
@ -2056,7 +2136,10 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth)
setLogDirectory("./logs");
{
LOG_INFO(log, "================First time=====================");
DB::KeeperLogStore changelog(DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
@ -2068,7 +2151,9 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth)
{
LOG_INFO(log, "================Second time=====================");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
@ -2080,7 +2165,9 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth)
{
LOG_INFO(log, "================Third time=====================");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
@ -2092,7 +2179,9 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth)
{
LOG_INFO(log, "================Fourth time=====================");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
@ -2112,7 +2201,9 @@ TEST_P(CoordinationTest, ChangelogInsertMultipleTimesSmooth)
{
LOG_INFO(log, "================First time=====================");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
for (size_t j = 0; j < 7; ++j)
{
@ -2124,7 +2215,9 @@ TEST_P(CoordinationTest, ChangelogInsertMultipleTimesSmooth)
}
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
EXPECT_EQ(changelog.next_slot(), 36 * 7 + 1);
}
@ -2137,7 +2230,9 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard)
{
LOG_INFO(log, "================First time=====================");
DB::KeeperLogStore changelog1(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog1.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog1.append(entry);
@ -2149,7 +2244,9 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard)
{
LOG_INFO(log, "================Second time=====================");
DB::KeeperLogStore changelog2(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog2.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog2.append(entry);
@ -2161,7 +2258,9 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard)
{
LOG_INFO(log, "================Third time=====================");
DB::KeeperLogStore changelog3(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog3.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog3.append(entry);
@ -2173,7 +2272,9 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard)
{
LOG_INFO(log, "================Fourth time=====================");
DB::KeeperLogStore changelog4(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog4.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog4.append(entry);
@ -2235,7 +2336,9 @@ TEST_P(CoordinationTest, TestLogGap)
setLogDirectory("./logs");
DB::KeeperLogStore changelog(
DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog.init(0, 3);
for (size_t i = 1; i < 55; ++i)
@ -2248,7 +2351,9 @@ TEST_P(CoordinationTest, TestLogGap)
}
DB::KeeperLogStore changelog1(
DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100}, keeper_context);
DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100},
DB::FlushSettings(),
keeper_context);
changelog1.init(61, 3);
/// Logs discarded
@ -2597,6 +2702,7 @@ TEST_P(CoordinationTest, ChangelogTestMaxLogSize)
DB::KeeperLogStore changelog(
DB::LogFileSettings{
.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20, .max_size = 50 * 1024 * 1024},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
@ -2616,6 +2722,7 @@ TEST_P(CoordinationTest, ChangelogTestMaxLogSize)
DB::KeeperLogStore changelog(
DB::LogFileSettings{
.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100'000, .max_size = 4000},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
@ -2637,6 +2744,7 @@ TEST_P(CoordinationTest, ChangelogTestMaxLogSize)
DB::KeeperLogStore changelog(
DB::LogFileSettings{
.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100'000, .max_size = 4000},
DB::FlushSettings(),
keeper_context);
changelog.init(1, 0);
ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10);

View File

@ -54,6 +54,9 @@ enum Command
COM_CHANGE_USER = 0x11,
COM_BINLOG_DUMP = 0x12,
COM_REGISTER_SLAVE = 0x15,
COM_STMT_PREPARE = 0x16,
COM_STMT_EXECUTE = 0x17,
COM_STMT_CLOSE = 0x19,
COM_RESET_CONNECTION = 0x1f,
COM_DAEMON = 0x1d,
COM_BINLOG_DUMP_GTID = 0x1e

View File

@ -0,0 +1,42 @@
#include <Core/MySQL/PacketsPreparedStatements.h>
#include <IO/WriteHelpers.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace MySQLProtocol
{
namespace PreparedStatements
{
size_t PreparedStatementResponseOK::getPayloadSize() const
{
// total = 13
return 1 // status
+ 4 // statement_id
+ 2 // num_columns
+ 2 // num_params
+ 1 // reserved_1 (filler)
+ 2 // warnings_count
+ 1; // metadata_follows
}
void PreparedStatementResponseOK::writePayloadImpl(WriteBuffer & buffer) const
{
buffer.write(reinterpret_cast<const char *>(&status), 1);
buffer.write(reinterpret_cast<const char *>(&statement_id), 4);
buffer.write(reinterpret_cast<const char *>(&num_columns), 2);
buffer.write(reinterpret_cast<const char *>(&num_params), 2);
buffer.write(reinterpret_cast<const char *>(&reserved_1), 1);
buffer.write(reinterpret_cast<const char *>(&warnings_count), 2);
buffer.write(0x0); // RESULTSET_METADATA_NONE
}
PreparedStatementResponseOK::PreparedStatementResponseOK(
uint32_t statement_id_, uint16_t num_columns_, uint16_t num_params_, uint16_t warnings_count_)
: statement_id(statement_id_), num_columns(num_columns_), num_params(num_params_), warnings_count(warnings_count_)
{
}
}
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Core/MySQL/IMySQLWritePacket.h>
namespace DB
{
namespace MySQLProtocol
{
namespace PreparedStatements
{
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_prepare.html#sect_protocol_com_stmt_prepare_response_ok
class PreparedStatementResponseOK : public IMySQLWritePacket
{
public:
const uint8_t status = 0x00;
uint32_t statement_id;
uint16_t num_columns;
uint16_t num_params;
const uint8_t reserved_1 = 0;
uint16_t warnings_count;
protected:
size_t getPayloadSize() const override;
void writePayloadImpl(WriteBuffer & buffer) const override;
public:
PreparedStatementResponseOK(uint32_t statement_id_, uint16_t num_columns_, uint16_t num_params_, uint16_t warnings_count_);
};
}
}
}

View File

@ -0,0 +1,365 @@
#include <Columns/IColumn.h>
#include <Core/MySQL/IMySQLReadPacket.h>
#include <Core/MySQL/IMySQLWritePacket.h>
#include <Core/MySQL/PacketsProtocolBinary.h>
#include "Common/LocalDate.h"
#include "Common/LocalDateTime.h"
#include "Columns/ColumnLowCardinality.h"
#include "Columns/ColumnNullable.h"
#include "Columns/ColumnVector.h"
#include "Columns/ColumnsDateTime.h"
#include "Core/DecimalFunctions.h"
#include "DataTypes/DataTypeDateTime64.h"
#include "DataTypes/DataTypeLowCardinality.h"
#include "DataTypes/DataTypeNullable.h"
#include "DataTypes/DataTypesNumber.h"
#include "Formats/FormatSettings.h"
#include "IO/WriteBufferFromString.h"
#include "base/DayNum.h"
#include "base/Decimal.h"
#include "base/types.h"
namespace DB
{
namespace MySQLProtocol
{
namespace ProtocolBinary
{
ResultSetRow::ResultSetRow(const Serializations & serializations_, const DataTypes & data_types_, const Columns & columns_, int row_num_)
: row_num(row_num_), columns(columns_), data_types(data_types_), serializations(serializations_)
{
payload_size = 1 + null_bitmap_size;
FormatSettings format_settings;
for (size_t i = 0; i < columns.size(); ++i)
{
ColumnPtr col = getColumn(i);
if (col->isNullAt(row_num))
{
// See https://dev.mysql.com/doc/dev/mysql-server/8.1.0/page_protocol_binary_resultset.html#sect_protocol_binary_resultset_row
size_t byte = (i + 2) / 8;
int bit = 1 << ((i + 2) % 8);
null_bitmap[byte] |= bit;
continue; // NULLs are stored in the null bitmap only
}
DataTypePtr data_type = removeLowCardinality(removeNullable((data_types[i])));
TypeIndex type_index = data_type->getTypeId();
switch (type_index)
{
case TypeIndex::Int8:
payload_size += 1;
break;
case TypeIndex::UInt8:
if (data_type->getName() == "Bool")
payload_size += 2; // BIT MySQL type is string<lenenc> in binary
else
payload_size += 1;
break;
case TypeIndex::Int16:
case TypeIndex::UInt16:
payload_size += 2;
break;
case TypeIndex::Int32:
case TypeIndex::UInt32:
case TypeIndex::Float32:
payload_size += 4;
break;
case TypeIndex::Int64:
case TypeIndex::UInt64:
case TypeIndex::Float64:
payload_size += 8;
break;
case TypeIndex::Date:
case TypeIndex::Date32: {
size_t size = 1 // number of bytes following
+ 2 // year
+ 1 // month
+ 1; // day
payload_size += size;
break;
}
case TypeIndex::DateTime: {
UInt32 value = assert_cast<const ColumnVector<UInt32> &>(*col).getData()[row_num];
LocalDateTime ldt = LocalDateTime(value, DateLUT::instance(getDateTimeTimezone(*data_type)));
bool has_time = !(ldt.hour() == 0 && ldt.minute() == 0 && ldt.second() == 0);
size_t size = 1 // number of bytes following
+ 2 // year
+ 1 // month
+ 1; // day
payload_size += size;
if (has_time)
{
size_t additional_size = 1 // hour
+ 1 // minute
+ 1; // second
payload_size += additional_size;
}
break;
}
case TypeIndex::DateTime64: {
auto [components, scale] = getDateTime64ComponentsWithScale(data_type, col);
if (scale > 6)
{
// MySQL Timestamp has max scale of 6
components.fractional /= static_cast<int>(pow(10, scale - 6));
}
LocalDateTime ldt = LocalDateTime(components.whole, DateLUT::instance(getDateTimeTimezone(*data_type)));
bool has_microseconds = components.fractional != 0;
bool has_time = !(ldt.hour() == 0 && ldt.minute() == 0 && ldt.second() == 0);
size_t size = 1 // number of bytes following
+ 2 // year
+ 1 // month
+ 1; // day
payload_size += size;
if (has_microseconds)
{
size_t additional_size = 1 // hour
+ 1 // minute
+ 1 // second
+ 4; // microsecond;
payload_size += additional_size;
}
else if (has_time)
{
size_t additional_size = 1 // hour
+ 1 // minute
+ 1; // second
payload_size += additional_size;
}
break;
}
// All other types including all Decimal types are string<lenenc> in binary
default:
WriteBufferFromOwnString ostr;
serializations[i]->serializeText(*columns[i], row_num, ostr, format_settings);
payload_size += getLengthEncodedStringSize(ostr.str());
serialized[i] = std::move(ostr.str());
break;
}
}
}
size_t ResultSetRow::getPayloadSize() const
{
return payload_size;
}
void ResultSetRow::writePayloadImpl(WriteBuffer & buffer) const
{
buffer.write(static_cast<char>(0x00));
buffer.write(null_bitmap.data(), null_bitmap_size);
for (size_t i = 0; i < columns.size(); ++i)
{
ColumnPtr col = getColumn(i);
if (col->isNullAt(row_num))
continue;
DataTypePtr data_type = removeLowCardinality(removeNullable((data_types[i])));
TypeIndex type_index = data_type->getTypeId();
switch (type_index)
{
case TypeIndex::Int8: {
Int8 value = assert_cast<const ColumnVector<Int8> &>(*col).getData()[row_num];
buffer.write(reinterpret_cast<char *>(&value), 1);
break;
}
case TypeIndex::UInt8: {
UInt8 value = assert_cast<const ColumnVector<UInt8> &>(*col).getData()[row_num];
if (data_type->getName() == "Bool")
buffer.write(static_cast<char>(1));
buffer.write(reinterpret_cast<char *>(&value), 1);
break;
}
case TypeIndex::Int16: {
Int16 value = assert_cast<const ColumnVector<Int16> &>(*col).getData()[row_num];
buffer.write(reinterpret_cast<char *>(&value), 2);
break;
}
case TypeIndex::UInt16: {
UInt16 value = assert_cast<const ColumnVector<UInt16> &>(*col).getData()[row_num];
buffer.write(reinterpret_cast<char *>(&value), 2);
break;
}
case TypeIndex::Int32: {
Int32 value = assert_cast<const ColumnVector<Int32> &>(*col).getData()[row_num];
buffer.write(reinterpret_cast<char *>(&value), 4);
break;
}
case TypeIndex::UInt32: {
UInt32 value = assert_cast<const ColumnVector<UInt32> &>(*col).getData()[row_num];
buffer.write(reinterpret_cast<char *>(&value), 4);
break;
}
case TypeIndex::Float32: {
Float32 value = assert_cast<const ColumnVector<Float32> &>(*col).getData()[row_num];
buffer.write(reinterpret_cast<char *>(&value), 4);
break;
}
case TypeIndex::Int64: {
Int64 value = assert_cast<const ColumnVector<Int64> &>(*col).getData()[row_num];
buffer.write(reinterpret_cast<char *>(&value), 8);
break;
}
case TypeIndex::UInt64: {
UInt64 value = assert_cast<const ColumnVector<UInt64> &>(*col).getData()[row_num];
buffer.write(reinterpret_cast<char *>(&value), 8);
break;
}
case TypeIndex::Float64: {
Float64 value = assert_cast<const ColumnVector<Float64> &>(*col).getData()[row_num];
buffer.write(reinterpret_cast<char *>(&value), 8);
break;
}
case TypeIndex::Date: {
UInt16 value = assert_cast<const ColumnVector<UInt16> &>(*col).getData()[row_num];
LocalDate ld = LocalDate(DayNum(value));
buffer.write(static_cast<char>(4)); // bytes_following
UInt16 year = ld.year();
UInt8 month = ld.month();
UInt8 day = ld.day();
buffer.write(reinterpret_cast<const char *>(&year), 2);
buffer.write(reinterpret_cast<const char *>(&month), 1);
buffer.write(reinterpret_cast<const char *>(&day), 1);
break;
}
case TypeIndex::Date32: {
Int32 value = assert_cast<const ColumnVector<Int32> &>(*col).getData()[row_num];
LocalDate ld = LocalDate(ExtendedDayNum(value));
buffer.write(static_cast<char>(4)); // bytes_following
UInt16 year = ld.year();
UInt8 month = ld.month();
UInt8 day = ld.day();
buffer.write(reinterpret_cast<const char *>(&year), 2);
buffer.write(reinterpret_cast<const char *>(&month), 1);
buffer.write(reinterpret_cast<const char *>(&day), 1);
break;
}
case TypeIndex::DateTime: {
UInt32 value = assert_cast<const ColumnVector<UInt32> &>(*col).getData()[row_num];
String timezone = getDateTimeTimezone(*data_type);
LocalDateTime ldt = LocalDateTime(value, DateLUT::instance(timezone));
UInt16 year = ldt.year();
UInt8 month = ldt.month();
UInt8 day = ldt.day();
UInt8 hour = ldt.hour();
UInt8 minute = ldt.minute();
UInt8 second = ldt.second();
bool has_time = !(hour == 0 && minute == 0 && second == 0);
size_t bytes_following = has_time ? 7 : 4;
buffer.write(reinterpret_cast<const char *>(&bytes_following), 1);
buffer.write(reinterpret_cast<const char *>(&year), 2);
buffer.write(reinterpret_cast<const char *>(&month), 1);
buffer.write(reinterpret_cast<const char *>(&day), 1);
if (has_time)
{
buffer.write(reinterpret_cast<const char *>(&hour), 1);
buffer.write(reinterpret_cast<const char *>(&minute), 1);
buffer.write(reinterpret_cast<const char *>(&second), 1);
}
break;
}
case TypeIndex::DateTime64: {
auto [components, scale] = getDateTime64ComponentsWithScale(data_type, col);
if (components.fractional != 0)
{
if (scale > 6)
{
// MySQL Timestamp has max scale of 6
components.fractional /= static_cast<int>(pow(10, scale - 6));
}
else
{
// fractional == 1 is a different microsecond value depending on the scale
// Scale 1 = 100000
// Scale 2 = 010000
// Scale 3 = 001000
// Scale 4 = 000100
// Scale 5 = 000010
// Scale 6 = 000001
components.fractional *= static_cast<int>(pow(10, 6 - scale));
}
}
String timezone = getDateTimeTimezone(*data_type);
LocalDateTime ldt = LocalDateTime(components.whole, DateLUT::instance(timezone));
UInt16 year = ldt.year();
UInt8 month = ldt.month();
UInt8 day = ldt.day();
UInt8 hour = ldt.hour();
UInt8 minute = ldt.minute();
UInt8 second = ldt.second();
bool has_time = !(hour == 0 && minute == 0 && second == 0);
bool has_microseconds = components.fractional != 0;
if (has_microseconds)
{
buffer.write(static_cast<char>(11)); // bytes_following
buffer.write(reinterpret_cast<const char *>(&year), 2);
buffer.write(reinterpret_cast<const char *>(&month), 1);
buffer.write(reinterpret_cast<const char *>(&day), 1);
buffer.write(reinterpret_cast<const char *>(&hour), 1);
buffer.write(reinterpret_cast<const char *>(&minute), 1);
buffer.write(reinterpret_cast<const char *>(&second), 1);
buffer.write(reinterpret_cast<const char *>(&components.fractional), 4);
}
else if (has_time)
{
buffer.write(static_cast<char>(7)); // bytes_following
buffer.write(reinterpret_cast<const char *>(&year), 2);
buffer.write(reinterpret_cast<const char *>(&month), 1);
buffer.write(reinterpret_cast<const char *>(&day), 1);
buffer.write(reinterpret_cast<const char *>(&hour), 1);
buffer.write(reinterpret_cast<const char *>(&minute), 1);
buffer.write(reinterpret_cast<const char *>(&second), 1);
}
else
{
buffer.write(static_cast<char>(4)); // bytes_following
buffer.write(reinterpret_cast<const char *>(&year), 2);
buffer.write(reinterpret_cast<const char *>(&month), 1);
buffer.write(reinterpret_cast<const char *>(&day), 1);
}
break;
}
// All other types including all Decimal types are string<lenenc> in binary
default:
writeLengthEncodedString(serialized[i], buffer);
break;
}
}
}
ResultSetRow::DateTime64ComponentsWithScale ResultSetRow::getDateTime64ComponentsWithScale(DataTypePtr data_type, ColumnPtr col) const
{
const auto * date_time_type = typeid_cast<const DataTypeDateTime64 *>(data_type.get());
static constexpr UInt32 MaxScale = DecimalUtils::max_precision<DateTime64>;
UInt32 scale = std::min(MaxScale, date_time_type->getScale());
const auto value = assert_cast<const ColumnDateTime64 &>(*col).getData()[row_num];
auto components = DecimalUtils::split(value, scale);
using T = typename DateTime64::NativeType;
if (value.value < 0 && components.fractional)
{
components.fractional = DecimalUtils::scaleMultiplier<T>(scale) + (components.whole ? T(-1) : T(1)) * components.fractional;
--components.whole;
}
return {components, scale};
}
ColumnPtr ResultSetRow::getColumn(size_t i) const
{
ColumnPtr col = columns[i]->convertToFullIfNeeded();
if (col->isNullable())
return assert_cast<const ColumnNullable &>(*col).getNestedColumnPtr();
return col;
}
}
}
}

View File

@ -0,0 +1,48 @@
#pragma once
#include <vector>
#include <Columns/IColumn.h>
#include <Core/MySQL/IMySQLReadPacket.h>
#include <Core/MySQL/IMySQLWritePacket.h>
#include "Core/DecimalFunctions.h"
#include "DataTypes/IDataType.h"
#include "DataTypes/Serializations/ISerialization.h"
namespace DB
{
namespace MySQLProtocol
{
namespace ProtocolBinary
{
class ResultSetRow : public IMySQLWritePacket
{
using DateTime64ComponentsWithScale = std::pair<DecimalUtils::DecimalComponents<DateTime64>, UInt32>;
private:
DateTime64ComponentsWithScale getDateTime64ComponentsWithScale(DataTypePtr data_type, ColumnPtr col) const;
ColumnPtr getColumn(size_t i) const;
protected:
int row_num;
const Columns & columns;
const DataTypes & data_types;
const Serializations & serializations;
std::vector<String> serialized = std::vector<String>(columns.size());
// See https://dev.mysql.com/doc/dev/mysql-server/8.1.0/page_protocol_binary_resultset.html#sect_protocol_binary_resultset_row
size_t null_bitmap_size = (columns.size() + 7 + 2) / 8;
std::vector<char> null_bitmap = std::vector<char>(null_bitmap_size, static_cast<char>(0));
size_t payload_size = 0;
size_t getPayloadSize() const override;
void writePayloadImpl(WriteBuffer & buffer) const override;
public:
ResultSetRow(const Serializations & serializations_, const DataTypes & data_types_, const Columns & columns_, int row_num_);
};
}
}
}

View File

@ -1,7 +1,12 @@
#include <Core/MySQL/PacketsProtocolText.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include "Common/assert_cast.h"
#include "Core/MySQL/IMySQLWritePacket.h"
#include "DataTypes/DataTypeLowCardinality.h"
#include "DataTypes/DataTypeNullable.h"
#include "DataTypes/DataTypesDecimal.h"
namespace DB
{
@ -131,16 +136,25 @@ void ColumnDefinition::writePayloadImpl(WriteBuffer & buffer) const
}
}
ColumnDefinition getColumnDefinition(const String & column_name, const TypeIndex type_index)
ColumnDefinition getColumnDefinition(const String & column_name, const DataTypePtr & data_type)
{
ColumnType column_type;
CharacterSet charset = CharacterSet::binary;
int flags = 0;
uint8_t decimals = 0;
TypeIndex type_index = removeLowCardinality(removeNullable(data_type))->getTypeId();
switch (type_index)
{
case TypeIndex::UInt8:
column_type = ColumnType::MYSQL_TYPE_TINY;
flags = ColumnDefinitionFlags::BINARY_FLAG | ColumnDefinitionFlags::UNSIGNED_FLAG;
if (data_type->getName() == "Bool")
{
column_type = ColumnType::MYSQL_TYPE_BIT;
}
else
{
column_type = ColumnType::MYSQL_TYPE_TINY;
flags = ColumnDefinitionFlags::BINARY_FLAG | ColumnDefinitionFlags::UNSIGNED_FLAG;
}
break;
case TypeIndex::UInt16:
column_type = ColumnType::MYSQL_TYPE_SHORT;
@ -173,30 +187,51 @@ ColumnDefinition getColumnDefinition(const String & column_name, const TypeIndex
case TypeIndex::Float32:
column_type = ColumnType::MYSQL_TYPE_FLOAT;
flags = ColumnDefinitionFlags::BINARY_FLAG;
decimals = 31;
break;
case TypeIndex::Float64:
column_type = ColumnType::MYSQL_TYPE_DOUBLE;
flags = ColumnDefinitionFlags::BINARY_FLAG;
decimals = 31;
break;
case TypeIndex::Date:
case TypeIndex::Date32:
column_type = ColumnType::MYSQL_TYPE_DATE;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::DateTime:
case TypeIndex::DateTime64:
column_type = ColumnType::MYSQL_TYPE_DATETIME;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::String:
case TypeIndex::FixedString:
column_type = ColumnType::MYSQL_TYPE_STRING;
charset = CharacterSet::utf8_general_ci;
case TypeIndex::Decimal32:
case TypeIndex::Decimal64:
column_type = ColumnType::MYSQL_TYPE_DECIMAL;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::Decimal128: {
// MySQL Decimal has max 65 precision and 30 scale
// Decimal256 (min scale is 39) is higher than the MySQL supported range and handled in the default case
// See https://dev.mysql.com/doc/refman/8.0/en/precision-math-decimal-characteristics.html
const auto & type = assert_cast<const DataTypeDecimal128 &>(*data_type);
if (type.getPrecision() > 65 || type.getScale() > 30)
{
column_type = ColumnType::MYSQL_TYPE_STRING;
charset = CharacterSet::utf8_general_ci;
}
else
{
column_type = ColumnType::MYSQL_TYPE_DECIMAL;
flags = ColumnDefinitionFlags::BINARY_FLAG;
}
break;
}
default:
column_type = ColumnType::MYSQL_TYPE_STRING;
charset = CharacterSet::utf8_general_ci;
break;
}
return ColumnDefinition(column_name, charset, 0, column_type, flags, 0);
return ColumnDefinition(column_name, charset, 0, column_type, flags, decimals);
}
}

View File

@ -116,16 +116,29 @@ public:
ColumnDefinition();
ColumnDefinition(
String schema_, String table_, String org_table_, String name_, String org_name_, uint16_t character_set_, uint32_t column_length_,
ColumnType column_type_, uint16_t flags_, uint8_t decimals_, bool with_defaults_ = false);
String schema_,
String table_,
String org_table_,
String name_,
String org_name_,
uint16_t character_set_,
uint32_t column_length_,
ColumnType column_type_,
uint16_t flags_,
uint8_t decimals_,
bool with_defaults_ = false);
/// Should be used when column metadata (original name, table, original table, database) is unknown.
ColumnDefinition(
String name_, uint16_t character_set_, uint32_t column_length_, ColumnType column_type_, uint16_t flags_, uint8_t decimals_);
String name_,
uint16_t character_set_,
uint32_t column_length_,
ColumnType column_type_,
uint16_t flags_,
uint8_t decimals_);
};
ColumnDefinition getColumnDefinition(const String & column_name, const TypeIndex index);
ColumnDefinition getColumnDefinition(const String & column_name, const DataTypePtr & data_type);
}

View File

@ -31,6 +31,12 @@ std::string DataTypeDecimal<T>::doGetName() const
template <is_decimal T>
std::string DataTypeDecimal<T>::getSQLCompatibleName() const
{
/// See https://dev.mysql.com/doc/refman/8.0/en/precision-math-decimal-characteristics.html
/// DECIMAL(M,D)
/// M is the maximum number of digits (the precision). It has a range of 1 to 65.
/// D is the number of digits to the right of the decimal point (the scale). It has a range of 0 to 30 and must be no larger than M.
if (this->precision > 65 || this->scale > 30)
return "TEXT";
return fmt::format("DECIMAL({}, {})", this->precision, this->scale);
}

View File

@ -288,6 +288,14 @@ struct FormatSettings
uint32_t client_capabilities = 0;
size_t max_packet_size = 0;
uint8_t * sequence_id = nullptr; /// Not null if it's MySQLWire output format used to handle MySQL protocol connections.
/**
* COM_QUERY uses Text ResultSet
* https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_text_resultset.html
* COM_STMT_EXECUTE uses Binary Protocol ResultSet
* https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_execute_response.html
* By default, use Text ResultSet.
*/
bool binary_protocol = false;
} mysql_wire;
struct

View File

@ -37,11 +37,14 @@ String InterpreterShowColumnsQuery::getRewrittenQuery()
SELECT
name AS field,
type AS type,
startsWith(type, 'Nullable') AS null,
if (startsWith(type, 'Nullable'), 'YES', 'NO') AS `null`,
trim(concatWithSeparator(' ', if (is_in_primary_key, 'PRI', ''), if (is_in_sorting_key, 'SOR', ''))) AS key,
if (default_kind IN ('ALIAS', 'DEFAULT', 'MATERIALIZED'), default_expression, NULL) AS default,
'' AS extra )";
// Known issue: Field 'null' is wrong for types like 'LowCardinality(Nullable(String))'. Can't simply replace 'startsWith' by
// `hasSubsequence` as that would return `true` for non-nullable types such as `Tuple(Nullable(String), String)`...
// TODO Interpret query.extended. It is supposed to show internal/virtual columns. Need to fetch virtual column names, see
// IStorage::getVirtuals(). We can't easily do that via SQL.

View File

@ -1,19 +1,19 @@
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsProtocolBinary.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
namespace DB
{
using namespace MySQLProtocol;
using namespace MySQLProtocol::Generic;
using namespace MySQLProtocol::ProtocolText;
using namespace MySQLProtocol::ProtocolBinary;
MySQLOutputFormat::MySQLOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: IOutputFormat(header_, out_)
@ -24,6 +24,8 @@ MySQLOutputFormat::MySQLOutputFormat(WriteBuffer & out_, const Block & header_,
/// But it's also possible to specify MySQLWire as output format for clickhouse-client or clickhouse-local.
/// There is no `sequence_id` stored in `settings_.mysql_wire` in this case, so we create a dummy one.
sequence_id = settings_.mysql_wire.sequence_id ? settings_.mysql_wire.sequence_id : &dummy_sequence_id;
/// Switch between Text (COM_QUERY) and Binary (COM_EXECUTE_STMT) ResultSet
use_binary_result_set = settings_.mysql_wire.binary_protocol;
const auto & header = getPort(PortKind::Main).getHeader();
data_types = header.getDataTypes();
@ -51,10 +53,10 @@ void MySQLOutputFormat::writePrefix()
for (size_t i = 0; i < header.columns(); ++i)
{
const auto & column_name = header.getColumnsWithTypeAndName()[i].name;
packet_endpoint->sendPacket(getColumnDefinition(column_name, data_types[i]->getTypeId()));
packet_endpoint->sendPacket(getColumnDefinition(column_name, data_types[i]));
}
if (!(client_capabilities & Capability::CLIENT_DEPRECATE_EOF))
if (!(client_capabilities & Capability::CLIENT_DEPRECATE_EOF) && !use_binary_result_set)
{
packet_endpoint->sendPacket(EOFPacket(0, 0));
}
@ -63,39 +65,67 @@ void MySQLOutputFormat::writePrefix()
void MySQLOutputFormat::consume(Chunk chunk)
{
for (size_t i = 0; i < chunk.getNumRows(); ++i)
if (!use_binary_result_set)
{
ProtocolText::ResultSetRow row_packet(serializations, chunk.getColumns(), static_cast<int>(i));
packet_endpoint->sendPacket(row_packet);
for (size_t i = 0; i < chunk.getNumRows(); ++i)
{
ProtocolText::ResultSetRow row_packet(serializations, chunk.getColumns(), static_cast<int>(i));
packet_endpoint->sendPacket(row_packet);
}
}
else
{
for (size_t i = 0; i < chunk.getNumRows(); ++i)
{
ProtocolBinary::ResultSetRow row_packet(serializations, data_types, chunk.getColumns(), static_cast<int>(i));
packet_endpoint->sendPacket(row_packet);
}
}
}
void MySQLOutputFormat::finalizeImpl()
{
size_t affected_rows = 0;
std::string human_readable_info;
if (QueryStatusPtr process_list_elem = getContext()->getProcessListElement())
if (!use_binary_result_set)
{
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo();
affected_rows = info.written_rows;
double elapsed_seconds = static_cast<double>(info.elapsed_microseconds) / 1000000.0;
human_readable_info = fmt::format(
"Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
info.read_rows,
ReadableSize(info.read_bytes),
elapsed_seconds,
static_cast<size_t>(info.read_rows / elapsed_seconds),
ReadableSize(info.read_bytes / elapsed_seconds));
}
size_t affected_rows = 0;
std::string human_readable_info;
if (QueryStatusPtr process_list_elem = getContext()->getProcessListElement())
{
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo();
affected_rows = info.written_rows;
double elapsed_seconds = static_cast<double>(info.elapsed_microseconds) / 1000000.0;
human_readable_info = fmt::format(
"Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
info.read_rows,
ReadableSize(info.read_bytes),
elapsed_seconds,
static_cast<size_t>(info.read_rows / elapsed_seconds),
ReadableSize(info.read_bytes / elapsed_seconds));
}
const auto & header = getPort(PortKind::Main).getHeader();
if (header.columns() == 0)
packet_endpoint->sendPacket(OKPacket(0x0, client_capabilities, affected_rows, 0, 0, "", human_readable_info), true);
else if (client_capabilities & CLIENT_DEPRECATE_EOF)
packet_endpoint->sendPacket(OKPacket(0xfe, client_capabilities, affected_rows, 0, 0, "", human_readable_info), true);
const auto & header = getPort(PortKind::Main).getHeader();
if (header.columns() == 0)
packet_endpoint->sendPacket(OKPacket(0x0, client_capabilities, affected_rows, 0, 0, "", human_readable_info), true);
else if (client_capabilities & CLIENT_DEPRECATE_EOF)
packet_endpoint->sendPacket(OKPacket(0xfe, client_capabilities, affected_rows, 0, 0, "", human_readable_info), true);
else
packet_endpoint->sendPacket(EOFPacket(0, 0), true);
}
else
packet_endpoint->sendPacket(EOFPacket(0, 0), true);
{
size_t affected_rows = 0;
if (QueryStatusPtr process_list_elem = getContext()->getProcessListElement())
{
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo();
affected_rows = info.written_rows;
}
if (client_capabilities & CLIENT_DEPRECATE_EOF)
packet_endpoint->sendPacket(OKPacket(0xfe, client_capabilities, affected_rows, 0, 0, "", ""), true);
else
packet_endpoint->sendPacket(EOFPacket(0, 0), true);
}
}
void MySQLOutputFormat::flush()

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/Formats/IRowOutputFormat.h>
#include <Core/Block.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Core/MySQL/PacketEndpoint.h>
#include <Processors/Formats/IOutputFormat.h>
@ -39,6 +39,7 @@ private:
MySQLProtocol::PacketEndpointPtr packet_endpoint;
DataTypes data_types;
Serializations serializations;
bool use_binary_result_set = false;
};
}

View File

@ -1,28 +1,30 @@
#include "MySQLHandler.h"
#include <limits>
#include <Common/NetException.h>
#include <Common/OpenSSLHelpers.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <optional>
#include <regex>
#include <Core/MySQL/Authentication.h>
#include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsPreparedStatements.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Core/NamesAndTypes.h>
#include <Interpreters/Session.h>
#include <Interpreters/executeQuery.h>
#include <IO/copyData.h>
#include <IO/LimitReadBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/copyData.h>
#include <Interpreters/Session.h>
#include <Interpreters/executeQuery.h>
#include <Server/TCPServer.h>
#include <Storages/IStorage.h>
#include <regex>
#include <Common/setThreadName.h>
#include <Core/MySQL/Authentication.h>
#include <Common/logger_useful.h>
#include <base/scope_guard.h>
#include <Common/NetException.h>
#include <Common/OpenSSLHelpers.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include "config_version.h"
@ -40,6 +42,7 @@ using namespace MySQLProtocol;
using namespace MySQLProtocol::Generic;
using namespace MySQLProtocol::ProtocolText;
using namespace MySQLProtocol::ConnectionPhase;
using namespace MySQLProtocol::PreparedStatements;
#if USE_SSL
using Poco::Net::SecureStreamSocket;
@ -173,7 +176,7 @@ void MySQLHandler::run()
comInitDB(limited_payload);
break;
case COM_QUERY:
comQuery(payload);
comQuery(payload, false);
break;
case COM_FIELD_LIST:
comFieldList(limited_payload);
@ -181,6 +184,15 @@ void MySQLHandler::run()
case COM_PING:
comPing();
break;
case COM_STMT_PREPARE:
comStmtPrepare(payload);
break;
case COM_STMT_EXECUTE:
comStmtExecute(payload);
break;
case COM_STMT_CLOSE:
comStmtClose(payload);
break;
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Command {} is not implemented.", command);
}
@ -254,7 +266,8 @@ void MySQLHandler::authenticate(const String & user_name, const String & auth_pl
{
try
{
// For compatibility with JavaScript MySQL client, Native41 authentication plugin is used when possible (if password is specified using double SHA1). Otherwise SHA256 plugin is used.
// For compatibility with JavaScript MySQL client, Native41 authentication plugin is used when possible
// (if password is specified using double SHA1). Otherwise, SHA256 plugin is used.
if (session->getAuthenticationTypeOrLogInFailure(user_name) == DB::AuthenticationType::SHA256_PASSWORD)
{
authPluginSSL();
@ -306,7 +319,7 @@ void MySQLHandler::comPing()
static bool isFederatedServerSetupSetCommand(const String & query);
void MySQLHandler::comQuery(ReadBuffer & payload)
void MySQLHandler::comQuery(ReadBuffer & payload, bool binary_protocol)
{
String query = String(payload.position(), payload.buffer().end());
@ -352,6 +365,7 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
format_settings.mysql_wire.client_capabilities = client_capabilities;
format_settings.mysql_wire.max_packet_size = max_packet_size;
format_settings.mysql_wire.sequence_id = &sequence_id;
format_settings.mysql_wire.binary_protocol = binary_protocol;
auto set_result_details = [&with_output](const QueryResultDetails & details)
{
@ -371,6 +385,90 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
}
}
void MySQLHandler::comStmtPrepare(DB::ReadBuffer & payload)
{
String statement;
readStringUntilEOF(statement, payload);
auto statement_id_opt = emplacePreparedStatement(std::move(statement));
if (statement_id_opt.has_value())
packet_endpoint->sendPacket(PreparedStatementResponseOK(statement_id_opt.value(), 0, 0, 0), true);
else
packet_endpoint->sendPacket(ERRPacket(), true);
}
void MySQLHandler::comStmtExecute(ReadBuffer & payload)
{
uint32_t statement_id;
payload.readStrict(reinterpret_cast<char *>(&statement_id), 4);
auto statement_opt = getPreparedStatement(statement_id);
if (statement_opt.has_value())
MySQLHandler::comQuery(statement_opt.value(), true);
else
packet_endpoint->sendPacket(ERRPacket(), true);
};
void MySQLHandler::comStmtClose(ReadBuffer & payload)
{
uint32_t statement_id;
payload.readStrict(reinterpret_cast<char *>(&statement_id), 4);
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_close.html
// No response packet is sent back to the client.
erasePreparedStatement(statement_id);
};
std::optional<UInt32> MySQLHandler::emplacePreparedStatement(String statement)
{
static constexpr size_t MAX_PREPARED_STATEMENTS = 10'000;
std::lock_guard<std::mutex> lock(prepared_statements_mutex);
if (prepared_statements.size() > MAX_PREPARED_STATEMENTS) /// Shouldn't happen in reality as COM_STMT_CLOSE cleans up the elements
{
LOG_ERROR(log, "Too many prepared statements");
current_prepared_statement_id = 0;
prepared_statements.clear();
return {};
}
uint32_t statement_id = current_prepared_statement_id;
++current_prepared_statement_id;
// Key collisions should not happen here, as we remove the elements from the map with COM_STMT_CLOSE,
// and we have quite a big range of available identifiers with 32-bit unsigned integer
if (prepared_statements.contains(statement_id))
{
LOG_ERROR(
log,
"Failed to store a new statement `{}` with id {}; it is already taken by `{}`",
statement,
statement_id,
prepared_statements.at(statement_id));
return {};
}
prepared_statements.emplace(statement_id, statement);
return std::make_optional(statement_id);
};
std::optional<ReadBufferFromString> MySQLHandler::getPreparedStatement(UInt32 statement_id)
{
std::lock_guard<std::mutex> lock(prepared_statements_mutex);
if (!prepared_statements.contains(statement_id))
{
LOG_ERROR(log, "Could not find prepared statement with id {}", statement_id);
return {};
}
// Temporary workaround as we work only with queries that do not bind any parameters atm
return std::make_optional<ReadBufferFromString>(prepared_statements.at(statement_id));
}
void MySQLHandler::erasePreparedStatement(UInt32 statement_id)
{
std::lock_guard<std::mutex> lock(prepared_statements_mutex);
prepared_statements.erase(statement_id);
}
void MySQLHandler::authPluginSSL()
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,

View File

@ -1,14 +1,17 @@
#pragma once
#include <Poco/Net/TCPServerConnection.h>
#include <base/getFQDNOrHostName.h>
#include <Common/CurrentMetrics.h>
#include <optional>
#include <Core/MySQL/Authentication.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <base/getFQDNOrHostName.h>
#include <Poco/Net/TCPServerConnection.h>
#include <Common/CurrentMetrics.h>
#include "IO/ReadBufferFromString.h"
#include "IServer.h"
#include "base/types.h"
#include "config.h"
#if USE_SSL
@ -30,6 +33,9 @@ class TCPServer;
/// Handler for MySQL wire protocol connections. Allows to connect to ClickHouse using MySQL client.
class MySQLHandler : public Poco::Net::TCPServerConnection
{
/// statement_id -> statement
using PreparedStatements = std::unordered_map<UInt32, String>;
public:
MySQLHandler(
IServer & server_,
@ -46,7 +52,7 @@ protected:
/// Enables SSL, if client requested.
void finishHandshake(MySQLProtocol::ConnectionPhase::HandshakeResponse &);
void comQuery(ReadBuffer & payload);
void comQuery(ReadBuffer & payload, bool binary_protocol);
void comFieldList(ReadBuffer & payload);
@ -56,6 +62,18 @@ protected:
void authenticate(const String & user_name, const String & auth_plugin_name, const String & auth_response);
void comStmtPrepare(ReadBuffer & payload);
void comStmtExecute(ReadBuffer & payload);
void comStmtClose(ReadBuffer & payload);
/// Contains statement_id if the statement was emplaced successfully
std::optional<UInt32> emplacePreparedStatement(String statement);
/// Contains statement as a buffer if we could find previously stored statement using provided statement_id
std::optional<ReadBufferFromString> getPreparedStatement(UInt32 statement_id);
void erasePreparedStatement(UInt32 statement_id);
virtual void authPluginSSL();
virtual void finishHandshakeSSL(size_t packet_size, char * buf, size_t pos, std::function<void(size_t)> read_bytes, MySQLProtocol::ConnectionPhase::HandshakeResponse & packet);
@ -76,6 +94,10 @@ protected:
using Replacements = std::unordered_map<std::string, ReplacementFn>;
Replacements replacements;
std::mutex prepared_statements_mutex;
UInt32 current_prepared_statement_id TSA_GUARDED_BY(prepared_statements_mutex) = 0;
PreparedStatements prepared_statements TSA_GUARDED_BY(prepared_statements_mutex);
std::unique_ptr<MySQLProtocol::Authentication::IPlugin> auth_plugin;
std::shared_ptr<ReadBufferFromPocoSocket> in;
std::shared_ptr<WriteBuffer> out;

View File

@ -19,6 +19,8 @@
<heart_beat_interval_ms>0</heart_beat_interval_ms>
<election_timeout_lower_bound_ms>0</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>0</election_timeout_upper_bound_ms>
<async_replication>1</async_replication>
</coordination_settings>
<raft_configuration>

View File

@ -0,0 +1,7 @@
<clickhouse>
<keeper_server>
<coordination_settings>
<async_replication>1</async_replication>
</coordination_settings>
</keeper_server>
</clickhouse>

View File

@ -4260,6 +4260,23 @@ class ClickHouseInstance:
if len(self.custom_dictionaries_paths):
write_embedded_config("0_common_enable_dictionaries.xml", self.config_d_dir)
version = None
version_parts = self.tag.split(".")
if version_parts[0].isdigit() and version_parts[1].isdigit():
version = {"major": int(version_parts[0]), "minor": int(version_parts[1])}
# async replication is only supported in version 23.9+
# for tags that don't specify a version we assume it has a version of ClickHouse
# that supports async replication if a test for it is present
if (
version == None
or version["major"] > 23
or (version["major"] == 23 and version["minor"] >= 9)
):
write_embedded_config(
"0_common_enable_keeper_async_replication.xml", self.config_d_dir
)
logging.debug("Generate and write macros file")
macros = self.macros.copy()
macros["instance"] = self.name

View File

@ -20,6 +20,8 @@
<force_sync>false</force_sync>
<election_timeout_lower_bound_ms>2000</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
<async_replication>1</async_replication>
</coordination_settings>
<raft_configuration>

View File

@ -20,6 +20,8 @@
<force_sync>false</force_sync>
<election_timeout_lower_bound_ms>2000</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
<async_replication>1</async_replication>
</coordination_settings>
<raft_configuration>

View File

@ -20,6 +20,8 @@
<force_sync>false</force_sync>
<election_timeout_lower_bound_ms>2000</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
<async_replication>1</async_replication>
</coordination_settings>
<raft_configuration>

View File

@ -279,8 +279,9 @@ def test_cmd_conf(started_cluster):
assert result["stale_log_gap"] == "10000"
assert result["fresh_log_gap"] == "200"
assert result["max_requests_batch_size"] == "100"
assert result["max_requests_batch_size"] == "1000"
assert result["max_requests_batch_bytes_size"] == "102400"
assert result["max_flush_batch_size"] == "1000"
assert result["max_request_queue_size"] == "100000"
assert result["max_requests_quick_batch_size"] == "100"
assert result["quorum_reads"] == "false"
@ -290,6 +291,7 @@ def test_cmd_conf(started_cluster):
assert result["compress_snapshots_with_zstd_format"] == "true"
assert result["configuration_change_tries_count"] == "20"
assert result["async_replication"] == "true"
finally:
close_keeper_socket(client)

View File

@ -0,0 +1,119 @@
### testSimpleDataTypes
Row #1
i8 type is TINYINT, value: -128
i16 type is SMALLINT, value: -32768
i32 type is INT, value: -2147483648
i64 type is BIGINT, value: -9223372036854775808
i128 type is CHAR, value: -170141183460469231731687303715884105728
i256 type is CHAR, value: -57896044618658097711785492504343953926634992332820282019728792003956564819968
ui8 type is TINYINT, value: 120
ui16 type is SMALLINT, value: 1234
ui32 type is INT, value: 51234
ui64 type is BIGINT, value: 421342
ui128 type is CHAR, value: 15324355
ui256 type is CHAR, value: 41345135123432
f32 type is FLOAT, value: -0.796896
f64 type is DOUBLE, value: -0.113259
b type is BIT, value: true
Row #2
i8 type is TINYINT, value: 127
i16 type is SMALLINT, value: 32767
i32 type is INT, value: 2147483647
i64 type is BIGINT, value: 9223372036854775807
i128 type is CHAR, value: 170141183460469231731687303715884105727
i256 type is CHAR, value: 57896044618658097711785492504343953926634992332820282019728792003956564819967
ui8 type is TINYINT, value: 255
ui16 type is SMALLINT, value: 65535
ui32 type is INT, value: 4294967295
ui64 type is BIGINT, value: 18446744073709551615
ui128 type is CHAR, value: 340282366920938463463374607431768211455
ui256 type is CHAR, value: 115792089237316195423570985008687907853269984665640564039457584007913129639935
f32 type is FLOAT, value: 1.234000
f64 type is DOUBLE, value: 3.352451
b type is BIT, value: false
### testStringTypes
Row #1
s type is CHAR, value: 42
sn type is CHAR, value: ᴺᵁᴸᴸ
lc type is CHAR, value: test
nlc type is CHAR, value: ᴺᵁᴸᴸ
Row #2
s type is CHAR, value: foo
sn type is CHAR, value: bar
lc type is CHAR, value: qaz
nlc type is CHAR, value: qux
### testLowCardinalityAndNullableTypes
Row #1
ilc type is INT, value: -54
dlc type is DATE, value: 1970-01-01
ni type is INT, value: 144
Row #2
ilc type is INT, value: 42
dlc type is DATE, value: 2011-02-05
ni type is INT, value: 0
### testDecimalTypes
Row #1
d32 type is DECIMAL, value: -1.55
d64 type is DECIMAL, value: 6.03
d128_native type is DECIMAL, value: 5
d128_text type is CHAR, value: -1224124.23423
d256 type is CHAR, value: -54342.3
Row #2
d32 type is DECIMAL, value: 1234567.89
d64 type is DECIMAL, value: 123456789123456.789
d128_native type is DECIMAL, value: 12345678912345678912.1234567891
d128_text type is CHAR, value: 1234567.8912345678912345678911234567891
d256 type is CHAR, value: 12345678912345678912345678911234567891234567891234567891.12345678911234567891
### testMiscTypes
Row #1
a type is CHAR, value: ['foo','bar']
u type is CHAR, value: 5da5038d-788f-48c6-b510-babb41c538d3
t type is CHAR, value: (42,'qaz')
m type is CHAR, value: {'qux':144,'text':255}
### testDateTypes
Row #1
d type is DATE, value: 1970-01-01
d32 type is DATE, value: 1900-01-01
dt type is TIMESTAMP, value: 1970-01-01 00:00:00.0
dt64_3 type is TIMESTAMP, value: 1900-01-01 00:00:00.001
dt64_6 type is TIMESTAMP, value: 1900-01-01 00:00:00.000001
dt64_9 type is TIMESTAMP, value: 1900-01-01 00:00:00.0
Row #2
d type is DATE, value: 2149-06-06
d32 type is DATE, value: 2178-04-16
dt type is TIMESTAMP, value: 2106-02-07 06:28:15.0
dt64_3 type is TIMESTAMP, value: 2106-02-07 06:28:15.123
dt64_6 type is TIMESTAMP, value: 2106-02-07 06:28:15.123456
dt64_9 type is TIMESTAMP, value: 2106-02-07 06:28:15.123456
### testUnusualDateTime64Scales
Row #1
dt64_0 type is TIMESTAMP, value: 2022-04-13 03:17:45.0
dt64_1 type is TIMESTAMP, value: 2022-04-13 03:17:45.1
dt64_2 type is TIMESTAMP, value: 2022-04-13 03:17:45.12
dt64_4 type is TIMESTAMP, value: 2022-04-13 03:17:45.1234
dt64_5 type is TIMESTAMP, value: 2022-04-13 03:17:45.12345
dt64_7 type is TIMESTAMP, value: 2022-04-13 03:17:45.123456
dt64_8 type is TIMESTAMP, value: 2022-04-13 03:17:45.123456
Row #2
dt64_0 type is TIMESTAMP, value: 2022-04-13 03:17:45.0
dt64_1 type is TIMESTAMP, value: 2022-04-13 03:17:45.1
dt64_2 type is TIMESTAMP, value: 2022-04-13 03:17:45.01
dt64_4 type is TIMESTAMP, value: 2022-04-13 03:17:45.0001
dt64_5 type is TIMESTAMP, value: 2022-04-13 03:17:45.00001
dt64_7 type is TIMESTAMP, value: 2022-04-13 03:17:45.0
dt64_8 type is TIMESTAMP, value: 2022-04-13 03:17:45.0
### testDateTimeTimezones
Row #1
dt type is TIMESTAMP, value: 1970-01-01 01:00:00.0
dt64_3 type is TIMESTAMP, value: 1969-12-31 16:00:00.0
Row #2
dt type is TIMESTAMP, value: 2022-09-04 20:31:05.0
dt64_3 type is TIMESTAMP, value: 2022-09-04 20:31:05.022

View File

@ -0,0 +1,137 @@
CREATE OR REPLACE TABLE ps_simple_data_types
(
i8 Int8,
i16 Int16,
i32 Int32,
i64 Int64,
i128 Int128,
i256 Int256,
ui8 UInt8,
ui16 UInt16,
ui32 UInt32,
ui64 UInt64,
ui128 UInt128,
ui256 UInt256,
f32 Float32,
f64 Float64,
b Boolean
) ENGINE MergeTree ORDER BY i8;
INSERT INTO ps_simple_data_types
VALUES (127, 32767, 2147483647, 9223372036854775807, 170141183460469231731687303715884105727,
57896044618658097711785492504343953926634992332820282019728792003956564819967,
255, 65535, 4294967295, 18446744073709551615, 340282366920938463463374607431768211455,
115792089237316195423570985008687907853269984665640564039457584007913129639935,
1.234, 3.35245141223232, FALSE),
(-128, -32768, -2147483648, -9223372036854775808, -170141183460469231731687303715884105728,
-57896044618658097711785492504343953926634992332820282019728792003956564819968,
120, 1234, 51234, 421342, 15324355, 41345135123432,
-0.7968956, -0.113259, TRUE);
CREATE OR REPLACE TABLE ps_string_types
(
s String,
sn Nullable(String),
lc LowCardinality(String),
nlc LowCardinality(Nullable(String))
) ENGINE MergeTree ORDER BY s;
INSERT INTO ps_string_types
VALUES ('foo', 'bar', 'qaz', 'qux'),
('42', NULL, 'test', NULL);
CREATE OR REPLACE TABLE ps_low_cardinality_and_nullable_types
(
ilc LowCardinality(Int32),
dlc LowCardinality(Date),
ni Nullable(Int32)
) ENGINE MergeTree ORDER BY ilc;
INSERT INTO ps_low_cardinality_and_nullable_types
VALUES (42, '2011-02-05', NULL),
(-54, '1970-01-01', 144);
CREATE OR REPLACE TABLE ps_decimal_types
(
d32 Decimal(9, 2),
d64 Decimal(18, 3),
d128_native Decimal(30, 10),
d128_text Decimal(38, 31),
d256 Decimal(76, 20)
) ENGINE MergeTree ORDER BY d32;
INSERT INTO ps_decimal_types
VALUES (1234567.89,
123456789123456.789,
12345678912345678912.1234567891,
1234567.8912345678912345678911234567891,
12345678912345678912345678911234567891234567891234567891.12345678911234567891),
(-1.55, 6.03, 5, -1224124.23423, -54342.3);
CREATE OR REPLACE TABLE ps_misc_types
(
a Array(String),
u UUID,
t Tuple(Int32, String),
m Map(String, Int32)
) ENGINE MergeTree ORDER BY u;
INSERT INTO ps_misc_types
VALUES (['foo', 'bar'], '5da5038d-788f-48c6-b510-babb41c538d3', (42, 'qaz'), {'qux': 144, 'text': 255});
CREATE OR REPLACE TABLE ps_date_types
(
d Date,
d32 Date32,
dt DateTime('UTC'),
dt64_3 DateTime64(3, 'UTC'),
dt64_6 DateTime64(6, 'UTC'),
dt64_9 DateTime64(9, 'UTC')
) ENGINE MergeTree ORDER BY d;
INSERT INTO ps_date_types
VALUES ('2149-06-06', '2178-04-16', '2106-02-07 06:28:15',
'2106-02-07 06:28:15.123',
'2106-02-07 06:28:15.123456',
'2106-02-07 06:28:15.123456789'),
('1970-01-01', '1900-01-01', '1970-01-01 00:00:00',
'1900-01-01 00:00:00.001',
'1900-01-01 00:00:00.000001',
'1900-01-01 00:00:00.000000001');;
CREATE OR REPLACE TABLE ps_unusual_datetime64_scales
(
dt64_0 DateTime64(0, 'UTC'),
dt64_1 DateTime64(1, 'UTC'),
dt64_2 DateTime64(2, 'UTC'),
dt64_4 DateTime64(4, 'UTC'),
dt64_5 DateTime64(5, 'UTC'),
dt64_7 DateTime64(7, 'UTC'),
dt64_8 DateTime64(8, 'UTC')
) ENGINE MergeTree ORDER BY dt64_0;
INSERT INTO ps_unusual_datetime64_scales
VALUES ('2022-04-13 03:17:45',
'2022-04-13 03:17:45.1',
'2022-04-13 03:17:45.12',
'2022-04-13 03:17:45.1234',
'2022-04-13 03:17:45.12345',
'2022-04-13 03:17:45.1234567',
'2022-04-13 03:17:45.12345678'),
('2022-04-13 03:17:45',
'2022-04-13 03:17:45.1',
'2022-04-13 03:17:45.01',
'2022-04-13 03:17:45.0001',
'2022-04-13 03:17:45.00001',
'2022-04-13 03:17:45.0000001',
'2022-04-13 03:17:45.00000001');
CREATE OR REPLACE TABLE ps_datetime_timezones
(
dt DateTime('Europe/Amsterdam'),
dt64_3 DateTime64(3, 'Asia/Shanghai')
) ENGINE MergeTree ORDER BY dt;
INSERT INTO ps_datetime_timezones
VALUES ('2022-09-04 20:31:05', '2022-09-04 20:31:05.022'),
('1970-01-01 01:00:00', '1969-12-31 16:00:00');

View File

@ -10,7 +10,6 @@ import logging
import docker
import pymysql.connections
import pytest
from docker.models.containers import Container
from helpers.cluster import ClickHouseCluster, get_docker_compose_path, run_and_check
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -150,7 +149,6 @@ def java_container():
def test_mysql_client(started_cluster):
# type: (ClickHouseCluster) -> None
code, (stdout, stderr) = started_cluster.mysql_client_container.exec_run(
"""
mysql --protocol tcp -h {host} -P {port} default -u user_with_double_sha1 --password=abacaba
@ -651,7 +649,6 @@ def test_python_client(started_cluster):
def test_golang_client(started_cluster, golang_container):
# type: (str, Container) -> None
with open(os.path.join(SCRIPT_DIR, "golang.reference"), "rb") as fp:
reference = fp.read()
@ -687,7 +684,6 @@ def test_golang_client(started_cluster, golang_container):
def test_php_client(started_cluster, php_container):
# type: (str, Container) -> None
code, (stdout, stderr) = php_container.exec_run(
"php -f test.php {host} {port} default 123".format(
host=started_cluster.get_instance_ip("node"), port=server_port
@ -764,7 +760,6 @@ def test_mysqljs_client(started_cluster, nodejs_container):
def test_java_client(started_cluster, java_container):
# type: (str, Container) -> None
with open(os.path.join(SCRIPT_DIR, "java.reference")) as fp:
reference = fp.read()
@ -810,6 +805,35 @@ def test_java_client(started_cluster, java_container):
assert stdout.decode() == reference
def test_prepared_statements(started_cluster, java_container):
with open(os.path.join(SCRIPT_DIR, "prepared_statements.reference")) as fp:
reference = fp.read()
with open(os.path.join(SCRIPT_DIR, "prepared_statements_test.sql")) as sql:
statements = list(
filter(
lambda s: s != "",
map(lambda s: s.strip().replace("\n", " "), sql.read().split(";")),
)
)
for statement in statements:
node.query(
statement,
settings={"password": "123", "allow_suspicious_low_cardinality_types": 1},
)
code, (stdout, stderr) = java_container.exec_run(
"java PreparedStatementsTest --host {host} --port {port} --user user_with_double_sha1 --password abacaba --database "
"default".format(
host=started_cluster.get_instance_ip("node"), port=server_port
),
demux=True,
)
assert code == 0
assert stdout.decode() == reference
def test_types(started_cluster):
client = pymysql.connections.Connection(
host=started_cluster.get_instance_ip("node"),

View File

@ -47,6 +47,7 @@
<snapshot_distance>{snapshot_distance}</snapshot_distance>
<stale_log_gap>{stale_log_gap}</stale_log_gap>
<reserved_log_items>{reserved_log_items}</reserved_log_items>
<async_replication>1</async_replication>
</coordination_settings>
<raft_configuration>

View File

@ -21,6 +21,7 @@
<heart_beat_interval_ms>1000</heart_beat_interval_ms>
<election_timeout_lower_bound_ms>2000</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
<async_replication>1</async_replication>
</coordination_settings>
<raft_configuration>

View File

@ -1,47 +1,47 @@
--- Aliases of SHOW COLUMNS
int32 Nullable(Int32) 1 \N
str String 0 SOR \N
uint64 UInt64 0 PRI SOR \N
int32 Nullable(Int32) 1 \N
str String 0 SOR \N
uint64 UInt64 0 PRI SOR \N
int32 Nullable(Int32) YES \N
str String NO SOR \N
uint64 UInt64 NO PRI SOR \N
int32 Nullable(Int32) YES \N
str String NO SOR \N
uint64 UInt64 NO PRI SOR \N
--- EXTENDED
int32 Nullable(Int32) 1 \N
str String 0 SOR \N
uint64 UInt64 0 PRI SOR \N
int32 Nullable(Int32) YES \N
str String NO SOR \N
uint64 UInt64 NO PRI SOR \N
--- FULL
int32 Nullable(Int32) 1 \N \N example comment
str String 0 SOR \N \N
uint64 UInt64 0 PRI SOR \N \N
int32 Nullable(Int32) YES \N \N example comment
str String NO SOR \N \N
uint64 UInt64 NO PRI SOR \N \N
--- LIKE
int32 Nullable(Int32) 1 \N
uint64 UInt64 0 PRI SOR \N
int32 Nullable(Int32) YES \N
uint64 UInt64 NO PRI SOR \N
--- NOT LIKE
str String 0 SOR \N
str String NO SOR \N
--- ILIKE
int32 Nullable(Int32) 1 \N
uint64 UInt64 0 PRI SOR \N
int32 Nullable(Int32) YES \N
uint64 UInt64 NO PRI SOR \N
--- NOT ILIKE
str String 0 SOR \N
str String NO SOR \N
--- WHERE
int32 Nullable(Int32) 1 \N
uint64 UInt64 0 PRI SOR \N
int32 Nullable(Int32) YES \N
uint64 UInt64 NO PRI SOR \N
--- LIMIT
int32 Nullable(Int32) 1 \N
int32 Nullable(Int32) YES \N
--- Check with weird table names
c String 0 PRI SOR \N
c String 0 PRI SOR \N
c String 0 PRI SOR \N
c String 0 PRI SOR \N
c String NO PRI SOR \N
c String NO PRI SOR \N
c String NO PRI SOR \N
c String NO PRI SOR \N
--- Original table
int32 Nullable(Int32) 1 \N
str String 0 SOR \N
uint64 UInt64 0 PRI SOR \N
int32 Nullable(Int32) YES \N
str String NO SOR \N
uint64 UInt64 NO PRI SOR \N
--- Equally named table in other database
int32 Int32 0 \N
str String 0 \N
uint64 UInt64 0 PRI SOR \N
int32 Int32 NO \N
str String NO \N
uint64 UInt64 NO PRI SOR \N
--- Short form
int32 Int32 0 \N
str String 0 \N
uint64 UInt64 0 PRI SOR \N
int32 Int32 NO \N
str String NO \N
uint64 UInt64 NO PRI SOR \N

View File

@ -4,363 +4,363 @@ Create pseudo-random database name
Create tab duplicate table
Run MySQL test
field type null key default extra
aggregate_function AggregateFunction(sum, Int32) 0 NULL
array_value Array(Int32) 0 NULL
boolean_value UInt8 0 NULL
date32_value Date32 0 NULL
date_value Date 0 NULL
datetime64_value DateTime64(3) 0 NULL
datetime_value DateTime 0 NULL
decimal_value Decimal(10, 2) 0 NULL
enum_value Enum8('apple' = 1, 'banana' = 2, 'orange' = 3) 0 NULL
fixed_string_value FixedString(10) 0 NULL
float32 Float32 0 NULL
float64 Float64 0 NULL
int128 Int128 0 NULL
int16 Int16 0 NULL
int256 Int256 0 NULL
int32 Int32 0 NULL
int64 Int64 0 NULL
int8 Int8 0 NULL
ipv4_value IPv4 0 NULL
ipv6_value IPv6 0 NULL
json_value Object('json') 0 NULL
low_cardinality LowCardinality(String) 0 NULL
low_cardinality_date LowCardinality(DateTime) 0 NULL
map_value Map(String, Int32) 0 NULL
nested.nested_int Array(Int32) 0 NULL
nested.nested_string Array(String) 0 NULL
nint32 Nullable(Int32) 1 NULL
nullable_value Nullable(Int32) 1 NULL
string_value String 0 NULL
tuple_value Tuple(Int32, String) 0 NULL
uint128 UInt128 0 NULL
uint16 UInt16 0 NULL
uint256 UInt256 0 NULL
uint32 UInt32 0 NULL
uint64 UInt64 0 PRI SOR NULL
uint8 UInt8 0 NULL
uuid_value UUID 0 NULL
aggregate_function AggregateFunction(sum, Int32) NO NULL
array_value Array(Int32) NO NULL
boolean_value UInt8 NO NULL
date32_value Date32 NO NULL
date_value Date NO NULL
datetime64_value DateTime64(3) NO NULL
datetime_value DateTime NO NULL
decimal_value Decimal(10, 2) NO NULL
enum_value Enum8('apple' = 1, 'banana' = 2, 'orange' = 3) NO NULL
fixed_string_value FixedString(10) NO NULL
float32 Float32 NO NULL
float64 Float64 NO NULL
int128 Int128 NO NULL
int16 Int16 NO NULL
int256 Int256 NO NULL
int32 Int32 NO NULL
int64 Int64 NO NULL
int8 Int8 NO NULL
ipv4_value IPv4 NO NULL
ipv6_value IPv6 NO NULL
json_value Object('json') NO NULL
low_cardinality LowCardinality(String) NO NULL
low_cardinality_date LowCardinality(DateTime) NO NULL
map_value Map(String, Int32) NO NULL
nested.nested_int Array(Int32) NO NULL
nested.nested_string Array(String) NO NULL
nint32 Nullable(Int32) YES NULL
nullable_value Nullable(Int32) YES NULL
string_value String NO NULL
tuple_value Tuple(Int32, String) NO NULL
uint128 UInt128 NO NULL
uint16 UInt16 NO NULL
uint256 UInt256 NO NULL
uint32 UInt32 NO NULL
uint64 UInt64 NO PRI SOR NULL
uint8 UInt8 NO NULL
uuid_value UUID NO NULL
field type null key default extra
aggregate_function TEXT 0 NULL
array_value TEXT 0 NULL
boolean_value TINYINT UNSIGNED 0 NULL
date32_value DATE 0 NULL
date_value DATE 0 NULL
datetime64_value DATETIME 0 NULL
datetime_value DATETIME 0 NULL
decimal_value DECIMAL(10, 2) 0 NULL
enum_value ENUM('apple', 'banana', 'orange') 0 NULL
fixed_string_value TEXT 0 NULL
float32 FLOAT 0 NULL
float64 DOUBLE 0 NULL
int128 TEXT 0 NULL
int16 SMALLINT 0 NULL
int256 TEXT 0 NULL
int32 INTEGER 0 NULL
int64 BIGINT 0 NULL
int8 TINYINT 0 NULL
ipv4_value TEXT 0 NULL
ipv6_value TEXT 0 NULL
json_value JSON 0 NULL
low_cardinality BLOB 0 NULL
low_cardinality_date DATETIME 0 NULL
map_value JSON 0 NULL
nested.nested_int TEXT 0 NULL
nested.nested_string TEXT 0 NULL
nint32 INTEGER 0 NULL
nullable_value INTEGER 0 NULL
string_value BLOB 0 NULL
tuple_value JSON 0 NULL
uint128 TEXT 0 NULL
uint16 SMALLINT UNSIGNED 0 NULL
uint256 TEXT 0 NULL
uint32 INTEGER UNSIGNED 0 NULL
uint64 BIGINT UNSIGNED 0 PRI SOR NULL
uint8 TINYINT UNSIGNED 0 NULL
uuid_value CHAR 0 NULL
aggregate_function TEXT NO NULL
array_value TEXT NO NULL
boolean_value TINYINT UNSIGNED NO NULL
date32_value DATE NO NULL
date_value DATE NO NULL
datetime64_value DATETIME NO NULL
datetime_value DATETIME NO NULL
decimal_value DECIMAL(10, 2) NO NULL
enum_value ENUM('apple', 'banana', 'orange') NO NULL
fixed_string_value TEXT NO NULL
float32 FLOAT NO NULL
float64 DOUBLE NO NULL
int128 TEXT NO NULL
int16 SMALLINT NO NULL
int256 TEXT NO NULL
int32 INTEGER NO NULL
int64 BIGINT NO NULL
int8 TINYINT NO NULL
ipv4_value TEXT NO NULL
ipv6_value TEXT NO NULL
json_value JSON NO NULL
low_cardinality BLOB NO NULL
low_cardinality_date DATETIME NO NULL
map_value JSON NO NULL
nested.nested_int TEXT NO NULL
nested.nested_string TEXT NO NULL
nint32 INTEGER NO NULL
nullable_value INTEGER NO NULL
string_value BLOB NO NULL
tuple_value JSON NO NULL
uint128 TEXT NO NULL
uint16 SMALLINT UNSIGNED NO NULL
uint256 TEXT NO NULL
uint32 INTEGER UNSIGNED NO NULL
uint64 BIGINT UNSIGNED NO PRI SOR NULL
uint8 TINYINT UNSIGNED NO NULL
uuid_value CHAR NO NULL
field type null key default extra
aggregate_function TEXT 0 NULL
array_value TEXT 0 NULL
boolean_value TINYINT UNSIGNED 0 NULL
date32_value DATE 0 NULL
date_value DATE 0 NULL
datetime64_value DATETIME 0 NULL
datetime_value DATETIME 0 NULL
decimal_value DECIMAL(10, 2) 0 NULL
enum_value ENUM('apple', 'banana', 'orange') 0 NULL
fixed_string_value TEXT 0 NULL
float32 FLOAT 0 NULL
float64 DOUBLE 0 NULL
int128 TEXT 0 NULL
int16 SMALLINT 0 NULL
int256 TEXT 0 NULL
int32 INTEGER 0 NULL
int64 BIGINT 0 NULL
int8 TINYINT 0 NULL
ipv4_value TEXT 0 NULL
ipv6_value TEXT 0 NULL
json_value JSON 0 NULL
low_cardinality BLOB 0 NULL
low_cardinality_date DATETIME 0 NULL
map_value JSON 0 NULL
nested.nested_int TEXT 0 NULL
nested.nested_string TEXT 0 NULL
nint32 INTEGER 0 NULL
nullable_value INTEGER 0 NULL
string_value BLOB 0 NULL
tuple_value JSON 0 NULL
uint128 TEXT 0 NULL
uint16 SMALLINT UNSIGNED 0 NULL
uint256 TEXT 0 NULL
uint32 INTEGER UNSIGNED 0 NULL
uint64 BIGINT UNSIGNED 0 PRI SOR NULL
uint8 TINYINT UNSIGNED 0 NULL
uuid_value CHAR 0 NULL
aggregate_function TEXT NO NULL
array_value TEXT NO NULL
boolean_value TINYINT UNSIGNED NO NULL
date32_value DATE NO NULL
date_value DATE NO NULL
datetime64_value DATETIME NO NULL
datetime_value DATETIME NO NULL
decimal_value DECIMAL(10, 2) NO NULL
enum_value ENUM('apple', 'banana', 'orange') NO NULL
fixed_string_value TEXT NO NULL
float32 FLOAT NO NULL
float64 DOUBLE NO NULL
int128 TEXT NO NULL
int16 SMALLINT NO NULL
int256 TEXT NO NULL
int32 INTEGER NO NULL
int64 BIGINT NO NULL
int8 TINYINT NO NULL
ipv4_value TEXT NO NULL
ipv6_value TEXT NO NULL
json_value JSON NO NULL
low_cardinality BLOB NO NULL
low_cardinality_date DATETIME NO NULL
map_value JSON NO NULL
nested.nested_int TEXT NO NULL
nested.nested_string TEXT NO NULL
nint32 INTEGER NO NULL
nullable_value INTEGER NO NULL
string_value BLOB NO NULL
tuple_value JSON NO NULL
uint128 TEXT NO NULL
uint16 SMALLINT UNSIGNED NO NULL
uint256 TEXT NO NULL
uint32 INTEGER UNSIGNED NO NULL
uint64 BIGINT UNSIGNED NO PRI SOR NULL
uint8 TINYINT UNSIGNED NO NULL
uuid_value CHAR NO NULL
field type null key default extra collation comment privileges
aggregate_function TEXT 0 NULL NULL
array_value TEXT 0 NULL NULL
boolean_value TINYINT UNSIGNED 0 NULL NULL
date32_value DATE 0 NULL NULL
date_value DATE 0 NULL NULL
datetime64_value DATETIME 0 NULL NULL
datetime_value DATETIME 0 NULL NULL
decimal_value DECIMAL(10, 2) 0 NULL NULL
enum_value ENUM('apple', 'banana', 'orange') 0 NULL NULL
fixed_string_value TEXT 0 NULL NULL
float32 FLOAT 0 NULL NULL
float64 DOUBLE 0 NULL NULL
int128 TEXT 0 NULL NULL
int16 SMALLINT 0 NULL NULL
int256 TEXT 0 NULL NULL
int32 INTEGER 0 NULL NULL
int64 BIGINT 0 NULL NULL
int8 TINYINT 0 NULL NULL
ipv4_value TEXT 0 NULL NULL
ipv6_value TEXT 0 NULL NULL
json_value JSON 0 NULL NULL
low_cardinality BLOB 0 NULL NULL
low_cardinality_date DATETIME 0 NULL NULL
map_value JSON 0 NULL NULL
nested.nested_int TEXT 0 NULL NULL
nested.nested_string TEXT 0 NULL NULL
nint32 INTEGER 0 NULL NULL
nullable_value INTEGER 0 NULL NULL
string_value BLOB 0 NULL NULL
tuple_value JSON 0 NULL NULL
uint128 TEXT 0 NULL NULL
uint16 SMALLINT UNSIGNED 0 NULL NULL
uint256 TEXT 0 NULL NULL
uint32 INTEGER UNSIGNED 0 NULL NULL
uint64 BIGINT UNSIGNED 0 PRI SOR NULL NULL
uint8 TINYINT UNSIGNED 0 NULL NULL
uuid_value CHAR 0 NULL NULL
aggregate_function TEXT NO NULL NULL
array_value TEXT NO NULL NULL
boolean_value TINYINT UNSIGNED NO NULL NULL
date32_value DATE NO NULL NULL
date_value DATE NO NULL NULL
datetime64_value DATETIME NO NULL NULL
datetime_value DATETIME NO NULL NULL
decimal_value DECIMAL(10, 2) NO NULL NULL
enum_value ENUM('apple', 'banana', 'orange') NO NULL NULL
fixed_string_value TEXT NO NULL NULL
float32 FLOAT NO NULL NULL
float64 DOUBLE NO NULL NULL
int128 TEXT NO NULL NULL
int16 SMALLINT NO NULL NULL
int256 TEXT NO NULL NULL
int32 INTEGER NO NULL NULL
int64 BIGINT NO NULL NULL
int8 TINYINT NO NULL NULL
ipv4_value TEXT NO NULL NULL
ipv6_value TEXT NO NULL NULL
json_value JSON NO NULL NULL
low_cardinality BLOB NO NULL NULL
low_cardinality_date DATETIME NO NULL NULL
map_value JSON NO NULL NULL
nested.nested_int TEXT NO NULL NULL
nested.nested_string TEXT NO NULL NULL
nint32 INTEGER NO NULL NULL
nullable_value INTEGER NO NULL NULL
string_value BLOB NO NULL NULL
tuple_value JSON NO NULL NULL
uint128 TEXT NO NULL NULL
uint16 SMALLINT UNSIGNED NO NULL NULL
uint256 TEXT NO NULL NULL
uint32 INTEGER UNSIGNED NO NULL NULL
uint64 BIGINT UNSIGNED NO PRI SOR NULL NULL
uint8 TINYINT UNSIGNED NO NULL NULL
uuid_value CHAR NO NULL NULL
field type null key default extra
int128 TEXT 0 NULL
int16 SMALLINT 0 NULL
int256 TEXT 0 NULL
int32 INTEGER 0 NULL
int64 BIGINT 0 NULL
int8 TINYINT 0 NULL
nested.nested_int TEXT 0 NULL
nint32 INTEGER 0 NULL
uint128 TEXT 0 NULL
uint16 SMALLINT UNSIGNED 0 NULL
uint256 TEXT 0 NULL
uint32 INTEGER UNSIGNED 0 NULL
uint64 BIGINT UNSIGNED 0 PRI SOR NULL
uint8 TINYINT UNSIGNED 0 NULL
int128 TEXT NO NULL
int16 SMALLINT NO NULL
int256 TEXT NO NULL
int32 INTEGER NO NULL
int64 BIGINT NO NULL
int8 TINYINT NO NULL
nested.nested_int TEXT NO NULL
nint32 INTEGER NO NULL
uint128 TEXT NO NULL
uint16 SMALLINT UNSIGNED NO NULL
uint256 TEXT NO NULL
uint32 INTEGER UNSIGNED NO NULL
uint64 BIGINT UNSIGNED NO PRI SOR NULL
uint8 TINYINT UNSIGNED NO NULL
field type null key default extra
aggregate_function TEXT 0 NULL
array_value TEXT 0 NULL
boolean_value TINYINT UNSIGNED 0 NULL
date32_value DATE 0 NULL
date_value DATE 0 NULL
datetime64_value DATETIME 0 NULL
datetime_value DATETIME 0 NULL
decimal_value DECIMAL(10, 2) 0 NULL
enum_value ENUM('apple', 'banana', 'orange') 0 NULL
fixed_string_value TEXT 0 NULL
float32 FLOAT 0 NULL
float64 DOUBLE 0 NULL
ipv4_value TEXT 0 NULL
ipv6_value TEXT 0 NULL
json_value JSON 0 NULL
low_cardinality BLOB 0 NULL
low_cardinality_date DATETIME 0 NULL
map_value JSON 0 NULL
nested.nested_string TEXT 0 NULL
nullable_value INTEGER 0 NULL
string_value BLOB 0 NULL
tuple_value JSON 0 NULL
uuid_value CHAR 0 NULL
aggregate_function TEXT NO NULL
array_value TEXT NO NULL
boolean_value TINYINT UNSIGNED NO NULL
date32_value DATE NO NULL
date_value DATE NO NULL
datetime64_value DATETIME NO NULL
datetime_value DATETIME NO NULL
decimal_value DECIMAL(10, 2) NO NULL
enum_value ENUM('apple', 'banana', 'orange') NO NULL
fixed_string_value TEXT NO NULL
float32 FLOAT NO NULL
float64 DOUBLE NO NULL
ipv4_value TEXT NO NULL
ipv6_value TEXT NO NULL
json_value JSON NO NULL
low_cardinality BLOB NO NULL
low_cardinality_date DATETIME NO NULL
map_value JSON NO NULL
nested.nested_string TEXT NO NULL
nullable_value INTEGER NO NULL
string_value BLOB NO NULL
tuple_value JSON NO NULL
uuid_value CHAR NO NULL
field type null key default extra
int128 TEXT 0 NULL
int16 SMALLINT 0 NULL
int256 TEXT 0 NULL
int32 INTEGER 0 NULL
int64 BIGINT 0 NULL
int8 TINYINT 0 NULL
nested.nested_int TEXT 0 NULL
nint32 INTEGER 0 NULL
uint128 TEXT 0 NULL
uint16 SMALLINT UNSIGNED 0 NULL
uint256 TEXT 0 NULL
uint32 INTEGER UNSIGNED 0 NULL
uint64 BIGINT UNSIGNED 0 PRI SOR NULL
uint8 TINYINT UNSIGNED 0 NULL
int128 TEXT NO NULL
int16 SMALLINT NO NULL
int256 TEXT NO NULL
int32 INTEGER NO NULL
int64 BIGINT NO NULL
int8 TINYINT NO NULL
nested.nested_int TEXT NO NULL
nint32 INTEGER NO NULL
uint128 TEXT NO NULL
uint16 SMALLINT UNSIGNED NO NULL
uint256 TEXT NO NULL
uint32 INTEGER UNSIGNED NO NULL
uint64 BIGINT UNSIGNED NO PRI SOR NULL
uint8 TINYINT UNSIGNED NO NULL
field type null key default extra
aggregate_function TEXT 0 NULL
array_value TEXT 0 NULL
boolean_value TINYINT UNSIGNED 0 NULL
date32_value DATE 0 NULL
date_value DATE 0 NULL
datetime64_value DATETIME 0 NULL
datetime_value DATETIME 0 NULL
decimal_value DECIMAL(10, 2) 0 NULL
enum_value ENUM('apple', 'banana', 'orange') 0 NULL
fixed_string_value TEXT 0 NULL
float32 FLOAT 0 NULL
float64 DOUBLE 0 NULL
ipv4_value TEXT 0 NULL
ipv6_value TEXT 0 NULL
json_value JSON 0 NULL
low_cardinality BLOB 0 NULL
low_cardinality_date DATETIME 0 NULL
map_value JSON 0 NULL
nested.nested_string TEXT 0 NULL
nullable_value INTEGER 0 NULL
string_value BLOB 0 NULL
tuple_value JSON 0 NULL
uuid_value CHAR 0 NULL
aggregate_function TEXT NO NULL
array_value TEXT NO NULL
boolean_value TINYINT UNSIGNED NO NULL
date32_value DATE NO NULL
date_value DATE NO NULL
datetime64_value DATETIME NO NULL
datetime_value DATETIME NO NULL
decimal_value DECIMAL(10, 2) NO NULL
enum_value ENUM('apple', 'banana', 'orange') NO NULL
fixed_string_value TEXT NO NULL
float32 FLOAT NO NULL
float64 DOUBLE NO NULL
ipv4_value TEXT NO NULL
ipv6_value TEXT NO NULL
json_value JSON NO NULL
low_cardinality BLOB NO NULL
low_cardinality_date DATETIME NO NULL
map_value JSON NO NULL
nested.nested_string TEXT NO NULL
nullable_value INTEGER NO NULL
string_value BLOB NO NULL
tuple_value JSON NO NULL
uuid_value CHAR NO NULL
field type null key default extra
int128 TEXT 0 NULL
int16 SMALLINT 0 NULL
int256 TEXT 0 NULL
int32 INTEGER 0 NULL
int64 BIGINT 0 NULL
int8 TINYINT 0 NULL
nested.nested_int TEXT 0 NULL
nint32 INTEGER 0 NULL
uint128 TEXT 0 NULL
uint16 SMALLINT UNSIGNED 0 NULL
uint256 TEXT 0 NULL
uint32 INTEGER UNSIGNED 0 NULL
uint64 BIGINT UNSIGNED 0 PRI SOR NULL
uint8 TINYINT UNSIGNED 0 NULL
int128 TEXT NO NULL
int16 SMALLINT NO NULL
int256 TEXT NO NULL
int32 INTEGER NO NULL
int64 BIGINT NO NULL
int8 TINYINT NO NULL
nested.nested_int TEXT NO NULL
nint32 INTEGER NO NULL
uint128 TEXT NO NULL
uint16 SMALLINT UNSIGNED NO NULL
uint256 TEXT NO NULL
uint32 INTEGER UNSIGNED NO NULL
uint64 BIGINT UNSIGNED NO PRI SOR NULL
uint8 TINYINT UNSIGNED NO NULL
field type null key default extra
aggregate_function TEXT 0 NULL
aggregate_function TEXT NO NULL
field type null key default extra
aggregate_function TEXT 0 NULL
array_value TEXT 0 NULL
boolean_value TINYINT UNSIGNED 0 NULL
date32_value DATE 0 NULL
date_value DATE 0 NULL
datetime64_value DATETIME 0 NULL
datetime_value DATETIME 0 NULL
decimal_value DECIMAL(10, 2) 0 NULL
enum_value ENUM('apple', 'banana', 'orange') 0 NULL
fixed_string_value TEXT 0 NULL
float32 FLOAT 0 NULL
float64 DOUBLE 0 NULL
int128 TEXT 0 NULL
int16 SMALLINT 0 NULL
int256 TEXT 0 NULL
int32 INTEGER 0 NULL
int64 BIGINT 0 NULL
int8 TINYINT 0 NULL
ipv4_value TEXT 0 NULL
ipv6_value TEXT 0 NULL
json_value JSON 0 NULL
low_cardinality BLOB 0 NULL
low_cardinality_date DATETIME 0 NULL
map_value JSON 0 NULL
nested.nested_int TEXT 0 NULL
nested.nested_string TEXT 0 NULL
nint32 INTEGER 0 NULL
nullable_value INTEGER 0 NULL
string_value BLOB 0 NULL
tuple_value JSON 0 NULL
uint128 TEXT 0 NULL
uint16 SMALLINT UNSIGNED 0 NULL
uint256 TEXT 0 NULL
uint32 INTEGER UNSIGNED 0 NULL
uint64 BIGINT UNSIGNED 0 PRI SOR NULL
uint8 TINYINT UNSIGNED 0 NULL
uuid_value CHAR 0 NULL
aggregate_function TEXT NO NULL
array_value TEXT NO NULL
boolean_value TINYINT UNSIGNED NO NULL
date32_value DATE NO NULL
date_value DATE NO NULL
datetime64_value DATETIME NO NULL
datetime_value DATETIME NO NULL
decimal_value DECIMAL(10, 2) NO NULL
enum_value ENUM('apple', 'banana', 'orange') NO NULL
fixed_string_value TEXT NO NULL
float32 FLOAT NO NULL
float64 DOUBLE NO NULL
int128 TEXT NO NULL
int16 SMALLINT NO NULL
int256 TEXT NO NULL
int32 INTEGER NO NULL
int64 BIGINT NO NULL
int8 TINYINT NO NULL
ipv4_value TEXT NO NULL
ipv6_value TEXT NO NULL
json_value JSON NO NULL
low_cardinality BLOB NO NULL
low_cardinality_date DATETIME NO NULL
map_value JSON NO NULL
nested.nested_int TEXT NO NULL
nested.nested_string TEXT NO NULL
nint32 INTEGER NO NULL
nullable_value INTEGER NO NULL
string_value BLOB NO NULL
tuple_value JSON NO NULL
uint128 TEXT NO NULL
uint16 SMALLINT UNSIGNED NO NULL
uint256 TEXT NO NULL
uint32 INTEGER UNSIGNED NO NULL
uint64 BIGINT UNSIGNED NO PRI SOR NULL
uint8 TINYINT UNSIGNED NO NULL
uuid_value CHAR NO NULL
field type null key default extra
aggregate_function TEXT 0 NULL
array_value TEXT 0 NULL
boolean_value TINYINT UNSIGNED 0 NULL
date32_value DATE 0 NULL
date_value DATE 0 NULL
datetime64_value DATETIME 0 NULL
datetime_value DATETIME 0 NULL
decimal_value DECIMAL(10, 2) 0 NULL
enum_value ENUM('apple', 'banana', 'orange') 0 NULL
fixed_string_value TEXT 0 NULL
float32 FLOAT 0 NULL
float64 DOUBLE 0 NULL
int128 TEXT 0 NULL
int16 SMALLINT 0 NULL
int256 TEXT 0 NULL
int32 INTEGER 0 NULL
int64 BIGINT 0 NULL
int8 TINYINT 0 NULL
ipv4_value TEXT 0 NULL
ipv6_value TEXT 0 NULL
json_value JSON 0 NULL
low_cardinality BLOB 0 NULL
low_cardinality_date DATETIME 0 NULL
map_value JSON 0 NULL
nested.nested_int TEXT 0 NULL
nested.nested_string TEXT 0 NULL
nint32 INTEGER 0 NULL
nullable_value INTEGER 0 NULL
string_value BLOB 0 NULL
tuple_value JSON 0 NULL
uint128 TEXT 0 NULL
uint16 SMALLINT UNSIGNED 0 NULL
uint256 TEXT 0 NULL
uint32 INTEGER UNSIGNED 0 NULL
uint64 BIGINT UNSIGNED 0 PRI SOR NULL
uint8 TINYINT UNSIGNED 0 NULL
uuid_value CHAR 0 NULL
aggregate_function TEXT NO NULL
array_value TEXT NO NULL
boolean_value TINYINT UNSIGNED NO NULL
date32_value DATE NO NULL
date_value DATE NO NULL
datetime64_value DATETIME NO NULL
datetime_value DATETIME NO NULL
decimal_value DECIMAL(10, 2) NO NULL
enum_value ENUM('apple', 'banana', 'orange') NO NULL
fixed_string_value TEXT NO NULL
float32 FLOAT NO NULL
float64 DOUBLE NO NULL
int128 TEXT NO NULL
int16 SMALLINT NO NULL
int256 TEXT NO NULL
int32 INTEGER NO NULL
int64 BIGINT NO NULL
int8 TINYINT NO NULL
ipv4_value TEXT NO NULL
ipv6_value TEXT NO NULL
json_value JSON NO NULL
low_cardinality BLOB NO NULL
low_cardinality_date DATETIME NO NULL
map_value JSON NO NULL
nested.nested_int TEXT NO NULL
nested.nested_string TEXT NO NULL
nint32 INTEGER NO NULL
nullable_value INTEGER NO NULL
string_value BLOB NO NULL
tuple_value JSON NO NULL
uint128 TEXT NO NULL
uint16 SMALLINT UNSIGNED NO NULL
uint256 TEXT NO NULL
uint32 INTEGER UNSIGNED NO NULL
uint64 BIGINT UNSIGNED NO PRI SOR NULL
uint8 TINYINT UNSIGNED NO NULL
uuid_value CHAR NO NULL
field type null key default extra
aggregate_function TEXT 0 NULL
array_value TEXT 0 NULL
boolean_value TINYINT UNSIGNED 0 NULL
date32_value DATE 0 NULL
date_value DATE 0 NULL
datetime64_value DATETIME 0 NULL
datetime_value DATETIME 0 NULL
decimal_value DECIMAL(10, 2) 0 NULL
enum_value ENUM('apple', 'banana', 'orange') 0 NULL
fixed_string_value TEXT 0 NULL
float32 FLOAT 0 NULL
float64 DOUBLE 0 NULL
int128 TEXT 0 NULL
int16 SMALLINT 0 NULL
int256 TEXT 0 NULL
int32 INTEGER 0 NULL
int64 BIGINT 0 NULL
int8 TINYINT 0 NULL
ipv4_value TEXT 0 NULL
ipv6_value TEXT 0 NULL
json_value JSON 0 NULL
low_cardinality BLOB 0 NULL
low_cardinality_date DATETIME 0 NULL
map_value JSON 0 NULL
nested.nested_int TEXT 0 NULL
nested.nested_string TEXT 0 NULL
nint32 INTEGER 0 NULL
nullable_value INTEGER 0 NULL
string_value BLOB 0 NULL
tuple_value JSON 0 NULL
uint128 TEXT 0 NULL
uint16 SMALLINT UNSIGNED 0 NULL
uint256 TEXT 0 NULL
uint32 INTEGER UNSIGNED 0 NULL
uint64 BIGINT UNSIGNED 0 PRI SOR NULL
uint8 TINYINT UNSIGNED 0 NULL
uuid_value CHAR 0 NULL
aggregate_function TEXT NO NULL
array_value TEXT NO NULL
boolean_value TINYINT UNSIGNED NO NULL
date32_value DATE NO NULL
date_value DATE NO NULL
datetime64_value DATETIME NO NULL
datetime_value DATETIME NO NULL
decimal_value DECIMAL(10, 2) NO NULL
enum_value ENUM('apple', 'banana', 'orange') NO NULL
fixed_string_value TEXT NO NULL
float32 FLOAT NO NULL
float64 DOUBLE NO NULL
int128 TEXT NO NULL
int16 SMALLINT NO NULL
int256 TEXT NO NULL
int32 INTEGER NO NULL
int64 BIGINT NO NULL
int8 TINYINT NO NULL
ipv4_value TEXT NO NULL
ipv6_value TEXT NO NULL
json_value JSON NO NULL
low_cardinality BLOB NO NULL
low_cardinality_date DATETIME NO NULL
map_value JSON NO NULL
nested.nested_int TEXT NO NULL
nested.nested_string TEXT NO NULL
nint32 INTEGER NO NULL
nullable_value INTEGER NO NULL
string_value BLOB NO NULL
tuple_value JSON NO NULL
uint128 TEXT NO NULL
uint16 SMALLINT UNSIGNED NO NULL
uint256 TEXT NO NULL
uint32 INTEGER UNSIGNED NO NULL
uint64 BIGINT UNSIGNED NO PRI SOR NULL
uint8 TINYINT UNSIGNED NO NULL
uuid_value CHAR NO NULL

View File

@ -74,7 +74,9 @@ int main(int argc, char *argv[])
LOG_INFO(logger, "Last committed index: {}", last_commited_index);
DB::KeeperLogStore changelog(
LogFileSettings{.force_sync = true, .compress_logs = settings->compress_logs, .rotate_interval = 10000000}, keeper_context);
LogFileSettings{.force_sync = true, .compress_logs = settings->compress_logs, .rotate_interval = 10000000},
FlushSettings(),
keeper_context);
changelog.init(last_commited_index, 10000000000UL); /// collect all logs
if (changelog.size() == 0)
LOG_INFO(logger, "Changelog empty");