Merge remote-tracking branch 'upstream/master' into async-insert

This commit is contained in:
Ivan Lezhankin 2021-07-13 13:54:09 +03:00
commit ab26aed6f9
134 changed files with 2969 additions and 214 deletions

3
.gitmodules vendored
View File

@ -228,3 +228,6 @@
[submodule "contrib/libpqxx"]
path = contrib/libpqxx
url = https://github.com/ClickHouse-Extras/libpqxx.git
[submodule "contrib/sqlite-amalgamation"]
path = contrib/sqlite-amalgamation
url = https://github.com/azadkuh/sqlite-amalgamation

View File

@ -536,6 +536,7 @@ include (cmake/find/rapidjson.cmake)
include (cmake/find/fastops.cmake)
include (cmake/find/odbc.cmake)
include (cmake/find/nanodbc.cmake)
include (cmake/find/sqlite.cmake)
include (cmake/find/rocksdb.cmake)
include (cmake/find/libpqxx.cmake)
include (cmake/find/nuraft.cmake)

View File

@ -18,6 +18,8 @@
#define DATE_LUT_MAX (0xFFFFFFFFU - 86400)
#define DATE_LUT_MAX_DAY_NUM 0xFFFF
/// Max int value of Date32, DATE LUT cache size minus daynum_offset_epoch
#define DATE_LUT_MAX_EXTEND_DAY_NUM (DATE_LUT_SIZE - 16436)
/// A constant to add to time_t so every supported time point becomes non-negative and still has the same remainder of division by 3600.
/// If we treat "remainder of division" operation in the sense of modular arithmetic (not like in C++).
@ -270,6 +272,8 @@ public:
auto getOffsetAtStartOfEpoch() const { return offset_at_start_of_epoch; }
auto getTimeOffsetAtStartOfLUT() const { return offset_at_start_of_lut; }
auto getDayNumOffsetEpoch() const { return daynum_offset_epoch; }
/// All functions below are thread-safe; arguments are not checked.
inline ExtendedDayNum toDayNum(ExtendedDayNum d) const
@ -926,15 +930,17 @@ public:
{
if (unlikely(year < DATE_LUT_MIN_YEAR || year > DATE_LUT_MAX_YEAR || month < 1 || month > 12 || day_of_month < 1 || day_of_month > 31))
return LUTIndex(0);
return LUTIndex{years_months_lut[(year - DATE_LUT_MIN_YEAR) * 12 + month - 1] + day_of_month - 1};
auto year_lut_index = (year - DATE_LUT_MIN_YEAR) * 12 + month - 1;
UInt32 index = years_months_lut[year_lut_index].toUnderType() + day_of_month - 1;
/// When date is out of range, default value is DATE_LUT_SIZE - 1 (2283-11-11)
return LUTIndex{std::min(index, static_cast<UInt32>(DATE_LUT_SIZE - 1))};
}
/// Create DayNum from year, month, day of month.
inline ExtendedDayNum makeDayNum(Int16 year, UInt8 month, UInt8 day_of_month) const
inline ExtendedDayNum makeDayNum(Int16 year, UInt8 month, UInt8 day_of_month, Int32 default_error_day_num = 0) const
{
if (unlikely(year < DATE_LUT_MIN_YEAR || year > DATE_LUT_MAX_YEAR || month < 1 || month > 12 || day_of_month < 1 || day_of_month > 31))
return ExtendedDayNum(0);
return ExtendedDayNum(default_error_day_num);
return toDayNum(makeLUTIndex(year, month, day_of_month));
}
@ -1091,9 +1097,9 @@ public:
return lut[new_index].date + time;
}
inline NO_SANITIZE_UNDEFINED Time addWeeks(Time t, Int64 delta) const
inline NO_SANITIZE_UNDEFINED Time addWeeks(Time t, Int32 delta) const
{
return addDays(t, delta * 7);
return addDays(t, static_cast<Int64>(delta) * 7);
}
inline UInt8 saturateDayOfMonth(Int16 year, UInt8 month, UInt8 day_of_month) const
@ -1158,14 +1164,14 @@ public:
return toDayNum(addMonthsIndex(d, delta));
}
inline Time NO_SANITIZE_UNDEFINED addQuarters(Time t, Int64 delta) const
inline Time NO_SANITIZE_UNDEFINED addQuarters(Time t, Int32 delta) const
{
return addMonths(t, delta * 3);
return addMonths(t, static_cast<Int64>(delta) * 3);
}
inline ExtendedDayNum addQuarters(ExtendedDayNum d, Int64 delta) const
inline ExtendedDayNum addQuarters(ExtendedDayNum d, Int32 delta) const
{
return addMonths(d, delta * 3);
return addMonths(d, static_cast<Int64>(delta) * 3);
}
template <typename DateOrTime>

View File

@ -70,6 +70,14 @@ public:
m_day = values.day_of_month;
}
explicit LocalDate(ExtendedDayNum day_num)
{
const auto & values = DateLUT::instance().getValues(day_num);
m_year = values.year;
m_month = values.month;
m_day = values.day_of_month;
}
LocalDate(unsigned short year_, unsigned char month_, unsigned char day_)
: m_year(year_), m_month(month_), m_day(day_)
{
@ -98,6 +106,12 @@ public:
return DayNum(lut.makeDayNum(m_year, m_month, m_day).toUnderType());
}
ExtendedDayNum getExtenedDayNum() const
{
const auto & lut = DateLUT::instance();
return ExtendedDayNum (lut.makeDayNum(m_year, m_month, m_day).toUnderType());
}
operator DayNum() const
{
return getDayNum();

16
cmake/find/sqlite.cmake Normal file
View File

@ -0,0 +1,16 @@
option(ENABLE_SQLITE "Enable sqlite" ${ENABLE_LIBRARIES})
if (NOT ENABLE_SQLITE)
return()
endif()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/sqlite-amalgamation/sqlite3.c")
message (WARNING "submodule contrib/sqlite3-amalgamation is missing. to fix try run: \n git submodule update --init --recursive")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find internal sqlite library")
set (USE_SQLITE 0)
return()
endif()
set (USE_SQLITE 1)
set (SQLITE_LIBRARY sqlite)
message (STATUS "Using sqlite=${USE_SQLITE}")

View File

@ -329,3 +329,7 @@ endif()
add_subdirectory(fast_float)
if (USE_SQLITE)
add_subdirectory(sqlite-cmake)
endif()

1
contrib/sqlite-amalgamation vendored Submodule

@ -0,0 +1 @@
Subproject commit 9818baa5d027ffb26d57f810dc4c597d4946781c

View File

@ -0,0 +1,6 @@
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/sqlite-amalgamation")
set(SRCS ${LIBRARY_DIR}/sqlite3.c)
add_library(sqlite ${SRCS})
target_include_directories(sqlite SYSTEM PUBLIC "${LIBRARY_DIR}")

View File

@ -378,6 +378,8 @@ function run_tests
# needs pv
01923_network_receive_time_metric_insert
01889_sqlite_read_write
)
time clickhouse-test --hung-check -j 8 --order=random --use-skip-list \

View File

@ -29,7 +29,8 @@ RUN apt-get update -y \
unixodbc \
wget \
mysql-client=5.7* \
postgresql-client
postgresql-client \
sqlite3
RUN pip3 install numpy scipy pandas

View File

@ -105,11 +105,11 @@ clickhouse-client -nmT < tests/queries/0_stateless/01521_dummy_test.sql | tee te
5) ensure everything is correct, if the test output is incorrect (due to some bug for example), adjust the reference file using text editor.
#### How to create good test
#### How to create a good test
- test should be
- A test should be
- minimal - create only tables related to tested functionality, remove unrelated columns and parts of query
- fast - should not take longer than few seconds (better subseconds)
- fast - should not take longer than a few seconds (better subseconds)
- correct - fails then feature is not working
- deterministic
- isolated / stateless
@ -126,6 +126,16 @@ clickhouse-client -nmT < tests/queries/0_stateless/01521_dummy_test.sql | tee te
- use other SQL files in the `0_stateless` folder as an example
- ensure the feature / feature combination you want to test is not yet covered with existing tests
#### Test naming rules
It's important to name tests correctly, so one could turn some tests subset off in clickhouse-test invocation.
| Tester flag| What should be in test name | When flag should be added |
|---|---|---|---|
| `--[no-]zookeeper`| "zookeeper" or "replica" | Test uses tables from ReplicatedMergeTree family |
| `--[no-]shard` | "shard" or "distributed" or "global"| Test using connections to 127.0.0.2 or similar |
| `--[no-]long` | "long" or "deadlock" or "race" | Test runs longer than 60 seconds |
#### Commit / push / create PR.
1) commit & push your changes

View File

@ -134,10 +134,10 @@ $ ./release
## Faster builds for development
Normally all tools of the ClickHouse bundle, such as `clickhouse-server`, `clickhouse-client` etc., are linked into a single static executable, `clickhouse`. This executable must be re-linked on every change, which might be slow. Two common ways to improve linking time are to use `lld` linker, and use the 'split' build configuration, which builds a separate binary for every tool, and further splits the code into several shared libraries. To enable these tweaks, pass the following flags to `cmake`:
Normally all tools of the ClickHouse bundle, such as `clickhouse-server`, `clickhouse-client` etc., are linked into a single static executable, `clickhouse`. This executable must be re-linked on every change, which might be slow. One common way to improve build time is to use the 'split' build configuration, which builds a separate binary for every tool, and further splits the code into several shared libraries. To enable this tweak, pass the following flags to `cmake`:
```
-DCMAKE_C_FLAGS="--ld-path=lld" -DCMAKE_CXX_FLAGS="--ld-path=lld" -DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1 -DCLICKHOUSE_SPLIT_BINARY=1
-DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1 -DCLICKHOUSE_SPLIT_BINARY=1
```
## You Dont Have to Build ClickHouse {#you-dont-have-to-build-clickhouse}

View File

@ -1,13 +1,8 @@
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_priority: 42
toc_title: mysql
---
# mysql {#mysql}
允许 `SELECT` 要对存储在远程MySQL服务器上的数据执行的查询。
允许对存储在远程MySQL服务器上的数据执行`SELECT`和`INSERT`查询。
**语法**
``` sql
mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
@ -15,31 +10,44 @@ mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_
**参数**
- `host:port` — MySQL server address.
- `host:port` — MySQL服务器地址.
- `database`Remote database name.
- `database`远程数据库名称.
- `table`Remote table name.
- `table`远程表名称.
- `user` — MySQL user.
- `user` — MySQL用户.
- `password`User password.
- `password`用户密码.
- `replace_query`Flag that converts `INSERT INTO` 查询到 `REPLACE INTO`. 如果 `replace_query=1`,查询被替换。
- `replace_query`将INSERT INTO` 查询转换为 `REPLACE INTO`的标志。如果 `replace_query=1`,查询被替换。
- `on_duplicate_clause` — The `ON DUPLICATE KEY on_duplicate_clause` 表达式被添加`INSERT` 查询。
- `on_duplicate_clause` — 添加 `ON DUPLICATE KEY on_duplicate_clause` 表达式到 `INSERT` 查询。明确规定只能使用 `replace_query = 0` 如果你同时设置replace_query = 1`和`on_duplicate_clause`ClickHouse将产生异常。
Example: `INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1`, where `on_duplicate_clause` is `UPDATE c2 = c2 + 1`. See the MySQL documentation to find which `on_duplicate_clause` you can use with the `ON DUPLICATE KEY` clause.
示例:`INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1`
To specify `on_duplicate_clause` you need to pass `0` to the `replace_query` parameter. If you simultaneously pass `replace_query = 1` and `on_duplicate_clause`, ClickHouse generates an exception.
`on_duplicate_clause`这里是`UPDATE c2 = c2 + 1`。请查阅MySQL文档来找到可以和`ON DUPLICATE KEY`一起使用的 `on_duplicate_clause`子句。
简单 `WHERE` 条款如 `=, !=, >, >=, <, <=` 当前在MySQL服务器上执行
简单`WHERE` 子句如 `=, !=, >, >=, <, <=` 将即时在MySQL服务器上执行。其余的条件和 `LIMIT` 只有在对MySQL的查询完成后才会在ClickHouse中执行采样约束
其余的条件和 `LIMIT` 只有在对MySQL的查询完成后才会在ClickHouse中执行采样约束。
支持使用`|`并列进行多副本查询,示例如下:
```sql
SELECT name FROM mysql(`mysql{1|2|3}:3306`, 'mysql_database', 'mysql_table', 'user', 'password');
```
```sql
SELECT name FROM mysql(`mysql1:3306|mysql2:3306|mysql3:3306`, 'mysql_database', 'mysql_table', 'user', 'password');
```
**返回值**
与原始MySQL表具有相同列的table对象。
与原始MySQL表具有相同列的表对象。
!!! note "注意"
在`INSERT`查询中为了区分`mysql(...)`与带有列名列表的表名的表函数,你必须使用关键字`FUNCTION`或`TABLE FUNCTION`。查看如下示例。
## 用法示例 {#usage-example}
@ -66,7 +74,7 @@ mysql> select * from test;
1 row in set (0,00 sec)
```
从ClickHouse中选择数据:
从ClickHouse中查询数据:
``` sql
SELECT * FROM mysql('localhost:3306', 'test', 'test', 'bayonet', '123')
@ -78,6 +86,21 @@ SELECT * FROM mysql('localhost:3306', 'test', 'test', 'bayonet', '123')
└────────┴──────────────┴───────┴────────────────┘
```
替换和插入:
```sql
INSERT INTO FUNCTION mysql('localhost:3306', 'test', 'test', 'bayonet', '123', 1) (int_id, float) VALUES (1, 3);
INSERT INTO TABLE FUNCTION mysql('localhost:3306', 'test', 'test', 'bayonet', '123', 0, 'UPDATE int_id = int_id + 1') (int_id, float) VALUES (1, 4);
SELECT * FROM mysql('localhost:3306', 'test', 'test', 'bayonet', '123');
```
```text
┌─int_id─┬─float─┐
│ 1 │ 3 │
│ 2 │ 4 │
└────────┴───────┘
```
## 另请参阅 {#see-also}
- [MySQL 表引擎](../../engines/table-engines/integrations/mysql.md)

View File

@ -479,21 +479,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
if (ThreadFuzzer::instance().isEffective())
LOG_WARNING(log, "ThreadFuzzer is enabled. Application will run slowly and unstable.");
#if !defined(NDEBUG) || !defined(__OPTIMIZE__)
LOG_WARNING(log, "Server was built in debug mode. It will work slowly.");
#endif
#if defined(SANITIZER)
LOG_WARNING(log, "Server was built with sanitizer. It will work slowly.");
#endif
// Initialize global thread pool. Do it before we fetch configs from zookeeper
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
// ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well.
// Also do it before global context initialization since it also may use threads from global pool.
GlobalThreadPool::initialize(config().getUInt("max_thread_pool_size", 10000));
/** Context contains all that query execution is dependent:
@ -505,6 +493,17 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::SERVER);
#if !defined(NDEBUG) || !defined(__OPTIMIZE__)
global_context->addWarningMessage("Server was built in debug mode. It will work slowly.");
#endif
if (ThreadFuzzer::instance().isEffective())
global_context->addWarningMessage("ThreadFuzzer is enabled. Application will run slowly and unstable.");
#if defined(SANITIZER)
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");
#endif
bool has_zookeeper = config().has("zookeeper");
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });
@ -555,8 +554,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (ptrace(PTRACE_TRACEME, 0, nullptr, nullptr) == -1)
{
/// Program is run under debugger. Modification of it's binary image is ok for breakpoints.
LOG_WARNING(log, "Server is run under debugger and its binary image is modified (most likely with breakpoints).",
calculated_binary_hash);
global_context->addWarningMessage(
fmt::format("Server is run under debugger and its binary image is modified (most likely with breakpoints).",
calculated_binary_hash)
);
}
else
{
@ -639,7 +640,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
else
{
LOG_WARNING(log, message);
global_context->addWarningMessage(message);
}
}

View File

@ -173,6 +173,7 @@ enum class AccessType
M(MONGO, "", GLOBAL, SOURCES) \
M(MYSQL, "", GLOBAL, SOURCES) \
M(POSTGRES, "", GLOBAL, SOURCES) \
M(SQLITE, "", GLOBAL, SOURCES) \
M(ODBC, "", GLOBAL, SOURCES) \
M(JDBC, "", GLOBAL, SOURCES) \
M(HDFS, "", GLOBAL, SOURCES) \

View File

@ -3,6 +3,7 @@
#include <AggregateFunctions/AggregateFunctionSequenceMatch.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <common/range.h>

View File

@ -101,6 +101,24 @@ struct AggregateFunctionSumData
{
const auto * end = ptr + count;
if constexpr (
(is_integer_v<T> && !is_big_int_v<T>)
|| (IsDecimalNumber<T> && !std::is_same_v<T, Decimal256> && !std::is_same_v<T, Decimal128>))
{
/// For integers we can vectorize the operation if we replace the null check using a multiplication (by 0 for null, 1 for not null)
/// https://quick-bench.com/q/MLTnfTvwC2qZFVeWHfOBR3U7a8I
T local_sum{};
while (ptr < end)
{
T multiplier = !*null_map;
Impl::add(local_sum, *ptr * multiplier);
++ptr;
++null_map;
}
Impl::add(sum, local_sum);
return;
}
if constexpr (std::is_floating_point_v<T>)
{
constexpr size_t unroll_count = 128 / sizeof(T);

View File

@ -4,6 +4,7 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
@ -49,6 +50,8 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
return res;
else if (which.isDate())
return std::make_shared<AggregateFunctionUniq<DataTypeDate::FieldType, Data>>(argument_types);
else if (which.isDate32())
return std::make_shared<AggregateFunctionUniq<DataTypeDate32::FieldType, Data>>(argument_types);
else if (which.isDateTime())
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data>>(argument_types);
else if (which.isStringOrFixedString())
@ -95,6 +98,8 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
return res;
else if (which.isDate())
return std::make_shared<AggregateFunctionUniq<DataTypeDate::FieldType, Data<DataTypeDate::FieldType>>>(argument_types);
else if (which.isDate32())
return std::make_shared<AggregateFunctionUniq<DataTypeDate32::FieldType, Data<DataTypeDate32::FieldType>>>(argument_types);
else if (which.isDateTime())
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data<DataTypeDateTime::FieldType>>>(argument_types);
else if (which.isStringOrFixedString())

View File

@ -6,6 +6,7 @@
#include <Common/FieldVisitorConvertToNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <functional>
@ -51,6 +52,8 @@ namespace
return res;
else if (which.isDate())
return std::make_shared<typename WithK<K, HashValueType>::template AggregateFunction<DataTypeDate::FieldType>>(argument_types, params);
else if (which.isDate32())
return std::make_shared<typename WithK<K, HashValueType>::template AggregateFunction<DataTypeDate32::FieldType>>(argument_types, params);
else if (which.isDateTime())
return std::make_shared<typename WithK<K, HashValueType>::template AggregateFunction<DataTypeDateTime::FieldType>>(argument_types, params);
else if (which.isStringOrFixedString())

View File

@ -3,6 +3,7 @@
#include <AggregateFunctions/AggregateFunctionUniqUpTo.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
@ -61,6 +62,8 @@ AggregateFunctionPtr createAggregateFunctionUniqUpTo(const std::string & name, c
return res;
else if (which.isDate())
return std::make_shared<AggregateFunctionUniqUpTo<DataTypeDate::FieldType>>(threshold, argument_types, params);
else if (which.isDate32())
return std::make_shared<AggregateFunctionUniqUpTo<DataTypeDate32::FieldType>>(threshold, argument_types, params);
else if (which.isDateTime())
return std::make_shared<AggregateFunctionUniqUpTo<DataTypeDateTime::FieldType>>(threshold, argument_types, params);
else if (which.isStringOrFixedString())

View File

@ -4,6 +4,7 @@
#include <AggregateFunctions/Helpers.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <common/range.h>

View File

@ -76,6 +76,10 @@ add_headers_and_sources(clickhouse_common_io IO)
add_headers_and_sources(clickhouse_common_io IO/S3)
list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp)
if (USE_SQLITE)
add_headers_and_sources(dbms Databases/SQLite)
endif()
if(USE_RDKAFKA)
add_headers_and_sources(dbms Storages/Kafka)
endif()
@ -425,6 +429,10 @@ if (USE_AMQPCPP)
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${AMQPCPP_INCLUDE_DIR})
endif()
if (USE_SQLITE)
dbms_target_link_libraries(PUBLIC sqlite)
endif()
if (USE_CASSANDRA)
dbms_target_link_libraries(PUBLIC ${CASSANDRA_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${CASS_INCLUDE_DIR})

View File

@ -558,6 +558,7 @@
M(588, DISTRIBUTED_BROKEN_BATCH_INFO) \
M(589, DISTRIBUTED_BROKEN_BATCH_FILES) \
M(590, CANNOT_SYSCONF) \
M(591, SQLITE_ENGINE_ERROR) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -62,6 +62,8 @@ void ExternalResultDescription::init(const Block & sample_block_)
types.emplace_back(ValueType::vtString, is_nullable);
else if (which.isDate())
types.emplace_back(ValueType::vtDate, is_nullable);
else if (which.isDate32())
types.emplace_back(ValueType::vtDate32, is_nullable);
else if (which.isDateTime())
types.emplace_back(ValueType::vtDateTime, is_nullable);
else if (which.isUUID())

View File

@ -26,6 +26,7 @@ struct ExternalResultDescription
vtEnum16,
vtString,
vtDate,
vtDate32,
vtDateTime,
vtUUID,
vtDateTime64,

View File

@ -89,6 +89,9 @@ void insertPostgreSQLValue(
case ExternalResultDescription::ValueType::vtDate:
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()});
break;
case ExternalResultDescription::ValueType::vtDate32:
assert_cast<ColumnInt32 &>(column).insertValue(Int32{LocalDate{std::string(value)}.getExtenedDayNum()});
break;
case ExternalResultDescription::ValueType::vtDateTime:
{
ReadBufferFromString in(value);

View File

@ -39,6 +39,7 @@ enum class TypeIndex
Float32,
Float64,
Date,
Date32,
DateTime,
DateTime64,
String,
@ -257,6 +258,7 @@ inline constexpr const char * getTypeName(TypeIndex idx)
case TypeIndex::Float32: return "Float32";
case TypeIndex::Float64: return "Float64";
case TypeIndex::Date: return "Date";
case TypeIndex::Date32: return "Date32";
case TypeIndex::DateTime: return "DateTime";
case TypeIndex::DateTime64: return "DateTime64";
case TypeIndex::String: return "String";

View File

@ -192,6 +192,7 @@ bool callOnIndexAndDataType(TypeIndex number, F && f, ExtraArgs && ... args)
case TypeIndex::Decimal256: return f(TypePair<DataTypeDecimal<Decimal256>, T>(), std::forward<ExtraArgs>(args)...);
case TypeIndex::Date: return f(TypePair<DataTypeDate, T>(), std::forward<ExtraArgs>(args)...);
case TypeIndex::Date32: return f(TypePair<DataTypeDate, T>(), std::forward<ExtraArgs>(args)...);
case TypeIndex::DateTime: return f(TypePair<DataTypeDateTime, T>(), std::forward<ExtraArgs>(args)...);
case TypeIndex::DateTime64: return f(TypePair<DataTypeDateTime64, T>(), std::forward<ExtraArgs>(args)...);

View File

@ -13,5 +13,6 @@
#cmakedefine01 USE_LDAP
#cmakedefine01 USE_ROCKSDB
#cmakedefine01 USE_LIBPQXX
#cmakedefine01 USE_SQLITE
#cmakedefine01 USE_NURAFT
#cmakedefine01 USE_KRB5

View File

@ -0,0 +1,163 @@
#include "SQLiteBlockInputStream.h"
#if USE_SQLITE
#include <common/range.h>
#include <common/logger_useful.h>
#include <Common/assert_cast.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SQLITE_ENGINE_ERROR;
}
SQLiteBlockInputStream::SQLiteBlockInputStream(
SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)
: query_str(query_str_)
, max_block_size(max_block_size_)
, sqlite_db(std::move(sqlite_db_))
{
description.init(sample_block);
}
void SQLiteBlockInputStream::readPrefix()
{
sqlite3_stmt * compiled_stmt = nullptr;
int status = sqlite3_prepare_v2(sqlite_db.get(), query_str.c_str(), query_str.size() + 1, &compiled_stmt, nullptr);
if (status != SQLITE_OK)
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot prepate sqlite statement. Status: {}. Message: {}",
status, sqlite3_errstr(status));
compiled_statement = std::unique_ptr<sqlite3_stmt, StatementDeleter>(compiled_stmt, StatementDeleter());
}
Block SQLiteBlockInputStream::readImpl()
{
if (!compiled_statement)
return Block();
MutableColumns columns = description.sample_block.cloneEmptyColumns();
size_t num_rows = 0;
while (true)
{
int status = sqlite3_step(compiled_statement.get());
if (status == SQLITE_BUSY)
{
continue;
}
else if (status == SQLITE_DONE)
{
compiled_statement.reset();
break;
}
else if (status != SQLITE_ROW)
{
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Expected SQLITE_ROW status, but got status {}. Error: {}, Message: {}",
status, sqlite3_errstr(status), sqlite3_errmsg(sqlite_db.get()));
}
int column_count = sqlite3_column_count(compiled_statement.get());
for (const auto idx : collections::range(0, column_count))
{
const auto & sample = description.sample_block.getByPosition(idx);
if (sqlite3_column_type(compiled_statement.get(), idx) == SQLITE_NULL)
{
insertDefaultSQLiteValue(*columns[idx], *sample.column);
continue;
}
if (description.types[idx].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, idx);
column_nullable.getNullMapData().emplace_back(0);
}
else
{
insertValue(*columns[idx], description.types[idx].first, idx);
}
}
if (++num_rows == max_block_size)
break;
}
return description.sample_block.cloneWithColumns(std::move(columns));
}
void SQLiteBlockInputStream::readSuffix()
{
if (compiled_statement)
compiled_statement.reset();
}
void SQLiteBlockInputStream::insertValue(IColumn & column, const ExternalResultDescription::ValueType type, size_t idx)
{
switch (type)
{
case ValueType::vtUInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
break;
case ValueType::vtUInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
break;
case ValueType::vtUInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(sqlite3_column_int64(compiled_statement.get(), idx));
break;
case ValueType::vtUInt64:
/// There is no uint64 in sqlite3, only int and int64
assert_cast<ColumnUInt64 &>(column).insertValue(sqlite3_column_int64(compiled_statement.get(), idx));
break;
case ValueType::vtInt8:
assert_cast<ColumnInt8 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
break;
case ValueType::vtInt16:
assert_cast<ColumnInt16 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
break;
case ValueType::vtInt32:
assert_cast<ColumnInt32 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
break;
case ValueType::vtInt64:
assert_cast<ColumnInt64 &>(column).insertValue(sqlite3_column_int64(compiled_statement.get(), idx));
break;
case ValueType::vtFloat32:
assert_cast<ColumnFloat32 &>(column).insertValue(sqlite3_column_double(compiled_statement.get(), idx));
break;
case ValueType::vtFloat64:
assert_cast<ColumnFloat64 &>(column).insertValue(sqlite3_column_double(compiled_statement.get(), idx));
break;
default:
const char * data = reinterpret_cast<const char *>(sqlite3_column_text(compiled_statement.get(), idx));
int len = sqlite3_column_bytes(compiled_statement.get(), idx);
assert_cast<ColumnString &>(column).insertData(data, len);
break;
}
}
}
#endif

View File

@ -0,0 +1,62 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_SQLITE
#include <Core/ExternalResultDescription.h>
#include <DataStreams/IBlockInputStream.h>
#include <sqlite3.h>
namespace DB
{
class SQLiteBlockInputStream : public IBlockInputStream
{
using SQLitePtr = std::shared_ptr<sqlite3>;
public:
SQLiteBlockInputStream(SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
UInt64 max_block_size_);
String getName() const override { return "SQLite"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
void insertDefaultSQLiteValue(IColumn & column, const IColumn & sample_column)
{
column.insertFrom(sample_column, 0);
}
using ValueType = ExternalResultDescription::ValueType;
struct StatementDeleter
{
void operator()(sqlite3_stmt * stmt) { sqlite3_finalize(stmt); }
};
void readPrefix() override;
Block readImpl() override;
void readSuffix() override;
void insertValue(IColumn & column, const ExternalResultDescription::ValueType type, size_t idx);
String query_str;
UInt64 max_block_size;
ExternalResultDescription description;
SQLitePtr sqlite_db;
std::unique_ptr<sqlite3_stmt, StatementDeleter> compiled_statement;
};
}
#endif

View File

@ -41,6 +41,7 @@ SRCS(
RemoteBlockOutputStream.cpp
RemoteQueryExecutor.cpp
RemoteQueryExecutorReadContext.cpp
SQLiteBlockInputStream.cpp
SizeLimits.cpp
SquashingBlockInputStream.cpp
SquashingBlockOutputStream.cpp

View File

@ -0,0 +1,23 @@
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/Serializations/SerializationDate32.h>
namespace DB
{
bool DataTypeDate32::equals(const IDataType & rhs) const
{
return typeid(rhs) == typeid(*this);
}
SerializationPtr DataTypeDate32::doGetDefaultSerialization() const
{
return std::make_shared<SerializationDate32>();
}
void registerDataTypeDate32(DataTypeFactory & factory)
{
factory.registerSimpleDataType(
"Date32", [] { return DataTypePtr(std::make_shared<DataTypeDate32>()); }, DataTypeFactory::CaseInsensitive);
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <DataTypes/DataTypeNumberBase.h>
namespace DB
{
class DataTypeDate32 final : public DataTypeNumberBase<Int32>
{
public:
static constexpr auto family_name = "Date32";
TypeIndex getTypeId() const override { return TypeIndex::Date32; }
const char * getFamilyName() const override { return family_name; }
bool canBeUsedAsVersion() const override { return true; }
bool canBeInsideNullable() const override { return true; }
bool equals(const IDataType & rhs) const override;
protected:
SerializationPtr doGetDefaultSerialization() const override;
};
}

View File

@ -194,6 +194,7 @@ DataTypeFactory::DataTypeFactory()
registerDataTypeNumbers(*this);
registerDataTypeDecimal(*this);
registerDataTypeDate(*this);
registerDataTypeDate32(*this);
registerDataTypeDateTime(*this);
registerDataTypeString(*this);
registerDataTypeFixedString(*this);

View File

@ -69,6 +69,7 @@ private:
void registerDataTypeNumbers(DataTypeFactory & factory);
void registerDataTypeDecimal(DataTypeFactory & factory);
void registerDataTypeDate(DataTypeFactory & factory);
void registerDataTypeDate32(DataTypeFactory & factory);
void registerDataTypeDateTime(DataTypeFactory & factory);
void registerDataTypeString(DataTypeFactory & factory);
void registerDataTypeFixedString(DataTypeFactory & factory);

View File

@ -78,6 +78,8 @@ MutableColumnUniquePtr DataTypeLowCardinality::createColumnUniqueImpl(const IDat
return creator(static_cast<ColumnFixedString *>(nullptr));
else if (which.isDate())
return creator(static_cast<ColumnVector<UInt16> *>(nullptr));
else if (which.isDate32())
return creator(static_cast<ColumnVector<Int32> *>(nullptr));
else if (which.isDateTime())
return creator(static_cast<ColumnVector<UInt32> *>(nullptr));
else if (which.isUUID())

View File

@ -322,8 +322,10 @@ struct WhichDataType
constexpr bool isEnum() const { return isEnum8() || isEnum16(); }
constexpr bool isDate() const { return idx == TypeIndex::Date; }
constexpr bool isDate32() const { return idx == TypeIndex::Date32; }
constexpr bool isDateTime() const { return idx == TypeIndex::DateTime; }
constexpr bool isDateTime64() const { return idx == TypeIndex::DateTime64; }
constexpr bool isDateOrDate32() const { return isDate() || isDate32(); }
constexpr bool isString() const { return idx == TypeIndex::String; }
constexpr bool isFixedString() const { return idx == TypeIndex::FixedString; }
@ -347,6 +349,10 @@ struct WhichDataType
template <typename T>
inline bool isDate(const T & data_type) { return WhichDataType(data_type).isDate(); }
template <typename T>
inline bool isDate32(const T & data_type) { return WhichDataType(data_type).isDate32(); }
template <typename T>
inline bool isDateOrDate32(const T & data_type) { return WhichDataType(data_type).isDateOrDate32(); }
template <typename T>
inline bool isDateTime(const T & data_type) { return WhichDataType(data_type).isDateTime(); }
template <typename T>
inline bool isDateTime64(const T & data_type) { return WhichDataType(data_type).isDateTime64(); }

View File

@ -29,7 +29,7 @@ namespace ErrorCodes
static inline bool typeIsSigned(const IDataType & type)
{
WhichDataType data_type(type);
return data_type.isNativeInt() || data_type.isFloat();
return data_type.isNativeInt() || data_type.isFloat() || data_type.isEnum();
}
static inline llvm::Type * toNativeType(llvm::IRBuilderBase & builder, const IDataType & type)
@ -57,6 +57,10 @@ static inline llvm::Type * toNativeType(llvm::IRBuilderBase & builder, const IDa
return builder.getFloatTy();
else if (data_type.isFloat64())
return builder.getDoubleTy();
else if (data_type.isEnum8())
return builder.getInt8Ty();
else if (data_type.isEnum16())
return builder.getInt16Ty();
return nullptr;
}
@ -109,7 +113,7 @@ static inline bool canBeNativeType(const IDataType & type)
return canBeNativeType(*data_type_nullable.getNestedType());
}
return data_type.isNativeInt() || data_type.isNativeUInt() || data_type.isFloat() || data_type.isDate();
return data_type.isNativeInt() || data_type.isNativeUInt() || data_type.isFloat() || data_type.isDate() || data_type.isEnum();
}
static inline llvm::Type * toNativeType(llvm::IRBuilderBase & builder, const DataTypePtr & type)
@ -266,7 +270,7 @@ static inline llvm::Constant * getColumnNativeValue(llvm::IRBuilderBase & builde
{
return llvm::ConstantInt::get(type, column.getUInt(index));
}
else if (column_data_type.isNativeInt())
else if (column_data_type.isNativeInt() || column_data_type.isEnum())
{
return llvm::ConstantInt::get(type, column.getInt(index));
}

View File

@ -0,0 +1,78 @@
#include <DataTypes/Serializations/SerializationDate32.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Columns/ColumnsNumber.h>
#include <Common/assert_cast.h>
namespace DB
{
void SerializationDate32::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const
{
writeDateText(ExtendedDayNum(assert_cast<const ColumnInt32 &>(column).getData()[row_num]), ostr);
}
void SerializationDate32::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
deserializeTextEscaped(column, istr, settings);
}
void SerializationDate32::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
ExtendedDayNum x;
readDateText(x, istr);
assert_cast<ColumnInt32 &>(column).getData().push_back(x);
}
void SerializationDate32::serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
serializeText(column, row_num, ostr, settings);
}
void SerializationDate32::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('\'', ostr);
serializeText(column, row_num, ostr, settings);
writeChar('\'', ostr);
}
void SerializationDate32::deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
ExtendedDayNum x;
assertChar('\'', istr);
readDateText(x, istr);
assertChar('\'', istr);
assert_cast<ColumnInt32 &>(column).getData().push_back(x); /// It's important to do this at the end - for exception safety.
}
void SerializationDate32::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('"', ostr);
serializeText(column, row_num, ostr, settings);
writeChar('"', ostr);
}
void SerializationDate32::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
ExtendedDayNum x;
assertChar('"', istr);
readDateText(x, istr);
assertChar('"', istr);
assert_cast<ColumnInt32 &>(column).getData().push_back(x);
}
void SerializationDate32::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('"', ostr);
serializeText(column, row_num, ostr, settings);
writeChar('"', ostr);
}
void SerializationDate32::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
LocalDate value;
readCSV(value, istr);
assert_cast<ColumnInt32 &>(column).getData().push_back(value.getExtenedDayNum());
}
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <DataTypes/Serializations/SerializationNumber.h>
namespace DB
{
class SerializationDate32 final : public SerializationNumber<Int32>
{
public:
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
};
}

View File

@ -16,6 +16,7 @@ SRCS(
DataTypeCustomIPv4AndIPv6.cpp
DataTypeCustomSimpleAggregateFunction.cpp
DataTypeDate.cpp
DataTypeDate32.cpp
DataTypeDateTime.cpp
DataTypeDateTime64.cpp
DataTypeDecimalBase.cpp
@ -45,6 +46,7 @@ SRCS(
Serializations/SerializationArray.cpp
Serializations/SerializationCustomSimpleText.cpp
Serializations/SerializationDate.cpp
Serializations/SerializationDate32.cpp
Serializations/SerializationDateTime.cpp
Serializations/SerializationDateTime64.cpp
Serializations/SerializationDecimal.cpp

View File

@ -1,17 +1,17 @@
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseDictionary.h>
#include <Databases/DatabaseLazy.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/formatAST.h>
#include <Interpreters/Context.h>
#include <Common/Macros.h>
#include <filesystem>
@ -40,6 +40,10 @@
#include <Storages/PostgreSQL/MaterializedPostgreSQLSettings.h>
#endif
#if USE_SQLITE
#include <Databases/SQLite/DatabaseSQLite.h>
#endif
namespace fs = std::filesystem;
namespace DB
@ -100,7 +104,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const UUID & uuid = create.uuid;
bool engine_may_have_arguments = engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "Lazy" ||
engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "MaterializedPostgreSQL";
engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "MaterializedPostgreSQL" || engine_name == "SQLite";
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
@ -299,6 +303,22 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
}
#endif
#if USE_SQLITE
else if (engine_name == "SQLite")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 1)
throw Exception("SQLite database requires 1 argument: database path", ErrorCodes::BAD_ARGUMENTS);
const auto & arguments = engine->arguments->children;
String database_path = safeGetLiteralValue<String>(arguments[0], "SQLite");
return std::make_shared<DatabaseSQLite>(context, engine_define, database_path);
}
#endif
throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE);

View File

@ -0,0 +1,224 @@
#include "DatabaseSQLite.h"
#if USE_SQLITE
#include <common/logger_useful.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Databases/SQLite/fetchSQLiteTableStructure.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Interpreters/Context.h>
#include <Storages/StorageSQLite.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SQLITE_ENGINE_ERROR;
extern const int UNKNOWN_TABLE;
}
DatabaseSQLite::DatabaseSQLite(
ContextPtr context_,
const ASTStorage * database_engine_define_,
const String & database_path_)
: IDatabase("SQLite")
, WithContext(context_->getGlobalContext())
, database_engine_define(database_engine_define_->clone())
, log(&Poco::Logger::get("DatabaseSQLite"))
{
sqlite3 * tmp_sqlite_db = nullptr;
int status = sqlite3_open(database_path_.c_str(), &tmp_sqlite_db);
if (status != SQLITE_OK)
{
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot access sqlite database. Error status: {}. Message: {}",
status, sqlite3_errstr(status));
}
sqlite_db = std::shared_ptr<sqlite3>(tmp_sqlite_db, sqlite3_close);
}
bool DatabaseSQLite::empty() const
{
std::lock_guard<std::mutex> lock(mutex);
return fetchTablesList().empty();
}
DatabaseTablesIteratorPtr DatabaseSQLite::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction &)
{
std::lock_guard<std::mutex> lock(mutex);
Tables tables;
auto table_names = fetchTablesList();
for (const auto & table_name : table_names)
tables[table_name] = fetchTable(table_name, local_context, true);
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
}
std::unordered_set<std::string> DatabaseSQLite::fetchTablesList() const
{
std::unordered_set<String> tables;
std::string query = "SELECT name FROM sqlite_master "
"WHERE type = 'table' AND name NOT LIKE 'sqlite_%'";
auto callback_get_data = [](void * res, int col_num, char ** data_by_col, char ** /* col_names */) -> int
{
for (int i = 0; i < col_num; ++i)
static_cast<std::unordered_set<std::string> *>(res)->insert(data_by_col[i]);
return 0;
};
char * err_message = nullptr;
int status = sqlite3_exec(sqlite_db.get(), query.c_str(), callback_get_data, &tables, &err_message);
if (status != SQLITE_OK)
{
String err_msg(err_message);
sqlite3_free(err_message);
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot fetch sqlite database tables. Error status: {}. Message: {}",
status, err_msg);
}
return tables;
}
bool DatabaseSQLite::checkSQLiteTable(const String & table_name) const
{
const String query = fmt::format("SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';", table_name);
auto callback_get_data = [](void * res, int, char **, char **) -> int
{
*(static_cast<int *>(res)) += 1;
return 0;
};
int count = 0;
char * err_message = nullptr;
int status = sqlite3_exec(sqlite_db.get(), query.c_str(), callback_get_data, &count, &err_message);
if (status != SQLITE_OK)
{
String err_msg(err_message);
sqlite3_free(err_message);
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot check sqlite table. Error status: {}. Message: {}",
status, err_msg);
}
return (count != 0);
}
bool DatabaseSQLite::isTableExist(const String & table_name, ContextPtr) const
{
std::lock_guard<std::mutex> lock(mutex);
return checkSQLiteTable(table_name);
}
StoragePtr DatabaseSQLite::tryGetTable(const String & table_name, ContextPtr local_context) const
{
std::lock_guard<std::mutex> lock(mutex);
return fetchTable(table_name, local_context, false);
}
StoragePtr DatabaseSQLite::fetchTable(const String & table_name, ContextPtr local_context, bool table_checked) const
{
if (!table_checked && !checkSQLiteTable(table_name))
return StoragePtr{};
auto columns = fetchSQLiteTableStructure(sqlite_db.get(), table_name);
if (!columns)
return StoragePtr{};
auto storage = StorageSQLite::create(
StorageID(database_name, table_name),
sqlite_db,
table_name,
ColumnsDescription{*columns},
ConstraintsDescription{},
local_context);
return storage;
}
ASTPtr DatabaseSQLite::getCreateDatabaseQuery() const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = getDatabaseName();
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
auto storage = fetchTable(table_name, local_context, false);
if (!storage)
{
if (throw_on_error)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "SQLite table {}.{} does not exist",
database_name, table_name);
return nullptr;
}
auto create_table_query = std::make_shared<ASTCreateQuery>();
auto table_storage_define = database_engine_define->clone();
create_table_query->set(create_table_query->storage, table_storage_define);
auto columns_declare_list = std::make_shared<ASTColumns>();
auto columns_expression_list = std::make_shared<ASTExpressionList>();
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
create_table_query->set(create_table_query->columns_list, columns_declare_list);
/// init create query.
auto table_id = storage->getStorageID();
create_table_query->table = table_id.table_name;
create_table_query->database = table_id.database_name;
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
for (const auto & column_type_and_name : metadata_snapshot->getColumns().getOrdinary())
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = column_type_and_name.name;
column_declaration->type = getColumnDeclaration(column_type_and_name.type);
columns_expression_list->children.emplace_back(column_declaration);
}
ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
ASTs storage_children = ast_storage->children;
auto storage_engine_arguments = ast_storage->engine->arguments;
/// Add table_name to engine arguments
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 1, std::make_shared<ASTLiteral>(table_id.table_name));
return create_table_query;
}
ASTPtr DatabaseSQLite::getColumnDeclaration(const DataTypePtr & data_type) const
{
WhichDataType which(data_type);
if (which.isNullable())
return makeASTFunction("Nullable", getColumnDeclaration(typeid_cast<const DataTypeNullable *>(data_type.get())->getNestedType()));
return std::make_shared<ASTIdentifier>(data_type->getName());
}
}
#endif

View File

@ -0,0 +1,65 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_SQLITE
#include <Core/Names.h>
#include <Databases/DatabasesCommon.h>
#include <Parsers/ASTCreateQuery.h>
#include <sqlite3.h>
namespace DB
{
class DatabaseSQLite final : public IDatabase, protected WithContext
{
public:
using SQLitePtr = std::shared_ptr<sqlite3>;
DatabaseSQLite(ContextPtr context_, const ASTStorage * database_engine_define_, const String & database_path_);
String getEngineName() const override { return "SQLite"; }
bool canContainMergeTreeTables() const override { return false; }
bool canContainDistributedTables() const override { return false; }
bool shouldBeEmptyOnDetach() const override { return false; }
bool isTableExist(const String & name, ContextPtr context) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) override;
bool empty() const override;
ASTPtr getCreateDatabaseQuery() const override;
void shutdown() override {}
protected:
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
private:
ASTPtr database_engine_define;
SQLitePtr sqlite_db;
Poco::Logger * log;
bool checkSQLiteTable(const String & table_name) const;
NameSet fetchTablesList() const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const;
ASTPtr getColumnDeclaration(const DataTypePtr & data_type) const;
};
}
#endif

View File

@ -0,0 +1,104 @@
#include <Databases/SQLite/fetchSQLiteTableStructure.h>
#if USE_SQLITE
#include <Common/quoteString.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Poco/String.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SQLITE_ENGINE_ERROR;
}
static DataTypePtr convertSQLiteDataType(String type)
{
DataTypePtr res;
type = Poco::toLower(type);
if (type == "tinyint")
res = std::make_shared<DataTypeInt8>();
else if (type == "smallint")
res = std::make_shared<DataTypeInt16>();
else if (type.starts_with("int") || type == "mediumint")
res = std::make_shared<DataTypeInt32>();
else if (type == "bigint")
res = std::make_shared<DataTypeInt64>();
else if (type == "float")
res = std::make_shared<DataTypeFloat32>();
else if (type.starts_with("double") || type == "real")
res = std::make_shared<DataTypeFloat64>();
else
res = std::make_shared<DataTypeString>(); // No decimal when fetching data through API
return res;
}
std::shared_ptr<NamesAndTypesList> fetchSQLiteTableStructure(sqlite3 * connection, const String & sqlite_table_name)
{
auto columns = NamesAndTypesList();
auto query = fmt::format("pragma table_info({});", quoteString(sqlite_table_name));
auto callback_get_data = [](void * res, int col_num, char ** data_by_col, char ** col_names) -> int
{
NameAndTypePair name_and_type;
bool is_nullable = false;
for (int i = 0; i < col_num; ++i)
{
if (strcmp(col_names[i], "name") == 0)
{
name_and_type.name = data_by_col[i];
}
else if (strcmp(col_names[i], "type") == 0)
{
name_and_type.type = convertSQLiteDataType(data_by_col[i]);
}
else if (strcmp(col_names[i], "notnull") == 0)
{
is_nullable = (data_by_col[i][0] == '0');
}
}
if (is_nullable)
name_and_type.type = std::make_shared<DataTypeNullable>(name_and_type.type);
static_cast<NamesAndTypesList *>(res)->push_back(name_and_type);
return 0;
};
char * err_message = nullptr;
int status = sqlite3_exec(connection, query.c_str(), callback_get_data, &columns, &err_message);
if (status != SQLITE_OK)
{
String err_msg(err_message);
sqlite3_free(err_message);
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Failed to fetch SQLite data. Status: {}. Message: {}",
status, err_msg);
}
if (columns.empty())
return nullptr;
return std::make_shared<NamesAndTypesList>(columns);
}
}
#endif

View File

@ -0,0 +1,19 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_SQLITE
#include <Storages/StorageSQLite.h>
#include <sqlite3.h>
namespace DB
{
std::shared_ptr<NamesAndTypesList> fetchSQLiteTableStructure(sqlite3 * connection,
const String & sqlite_table_name);
}
#endif

View File

@ -27,6 +27,8 @@ SRCS(
MySQL/MaterializeMetadata.cpp
MySQL/MaterializeMySQLSettings.cpp
MySQL/MaterializeMySQLSyncThread.cpp
SQLite/DatabaseSQLite.cpp
SQLite/fetchSQLiteTableStructure.cpp
)

View File

@ -42,6 +42,11 @@ struct ToYearWeekImpl
YearWeek yw = time_zone.toYearWeek(time_zone.toDayNum(t), week_mode | static_cast<UInt32>(WeekModeFlag::YEAR));
return yw.first * 100 + yw.second;
}
static inline UInt32 execute(Int32 d, UInt8 week_mode, const DateLUTImpl & time_zone)
{
YearWeek yw = time_zone.toYearWeek(ExtendedDayNum (d), week_mode | static_cast<UInt32>(WeekModeFlag::YEAR));
return yw.first * 100 + yw.second;
}
static inline UInt32 execute(UInt16 d, UInt8 week_mode, const DateLUTImpl & time_zone)
{
YearWeek yw = time_zone.toYearWeek(DayNum(d), week_mode | static_cast<UInt32>(WeekModeFlag::YEAR));
@ -65,6 +70,10 @@ struct ToStartOfWeekImpl
return time_zone.toFirstDayNumOfWeek(time_zone.toDayNum(t), week_mode);
// return time_zone.toFirstDayNumOfWeek(t, week_mode);
}
static inline UInt16 execute(Int32 d, UInt8 week_mode, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfWeek(ExtendedDayNum(d), week_mode);
}
static inline UInt16 execute(UInt16 d, UInt8 week_mode, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfWeek(ExtendedDayNum(d), week_mode);
@ -88,6 +97,11 @@ struct ToWeekImpl
YearWeek yw = time_zone.toYearWeek(time_zone.toDayNum(t), week_mode);
return yw.second;
}
static inline UInt8 execute(Int32 d, UInt8 week_mode, const DateLUTImpl & time_zone)
{
YearWeek yw = time_zone.toYearWeek(ExtendedDayNum(d), week_mode);
return yw.second;
}
static inline UInt8 execute(UInt16 d, UInt8 week_mode, const DateLUTImpl & time_zone)
{
YearWeek yw = time_zone.toYearWeek(DayNum(d), week_mode);

View File

@ -46,6 +46,7 @@ struct ZeroTransform
{
static inline UInt16 execute(Int64, const DateLUTImpl &) { return 0; }
static inline UInt16 execute(UInt32, const DateLUTImpl &) { return 0; }
static inline UInt16 execute(Int32, const DateLUTImpl &) { return 0; }
static inline UInt16 execute(UInt16, const DateLUTImpl &) { return 0; }
};
@ -61,6 +62,10 @@ struct ToDateImpl
{
return UInt16(time_zone.toDayNum(t));
}
static inline UInt16 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl &)
{
return d;
@ -82,6 +87,10 @@ struct ToStartOfDayImpl
{
return time_zone.toDate(t);
}
static inline UInt32 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toDate(ExtendedDayNum(d));
}
static inline UInt32 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toDate(ExtendedDayNum(d));
@ -104,6 +113,10 @@ struct ToMondayImpl
//return time_zone.toFirstDayNumOfWeek(time_zone.toDayNum(t));
return time_zone.toFirstDayNumOfWeek(t);
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfWeek(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfWeek(ExtendedDayNum(d));
@ -124,6 +137,10 @@ struct ToStartOfMonthImpl
{
return time_zone.toFirstDayNumOfMonth(time_zone.toDayNum(t));
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfMonth(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfMonth(ExtendedDayNum(d));
@ -144,6 +161,10 @@ struct ToStartOfQuarterImpl
{
return time_zone.toFirstDayNumOfQuarter(time_zone.toDayNum(t));
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfQuarter(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfQuarter(ExtendedDayNum(d));
@ -164,6 +185,10 @@ struct ToStartOfYearImpl
{
return time_zone.toFirstDayNumOfYear(time_zone.toDayNum(t));
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfYear(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfYear(ExtendedDayNum(d));
@ -186,7 +211,10 @@ struct ToTimeImpl
{
return time_zone.toTime(t) + 86400;
}
static inline UInt32 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt32 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -207,6 +235,10 @@ struct ToStartOfMinuteImpl
{
return time_zone.toStartOfMinute(t);
}
static inline UInt32 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt32 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -242,6 +274,10 @@ struct ToStartOfSecondImpl
{
throw Exception("Illegal type DateTime of argument for function " + std::string(name), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
static inline UInt32 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt32 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -262,6 +298,10 @@ struct ToStartOfFiveMinuteImpl
{
return time_zone.toStartOfFiveMinute(t);
}
static inline UInt32 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt32 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -282,6 +322,10 @@ struct ToStartOfTenMinutesImpl
{
return time_zone.toStartOfTenMinutes(t);
}
static inline UInt32 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt32 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -302,6 +346,10 @@ struct ToStartOfFifteenMinutesImpl
{
return time_zone.toStartOfFifteenMinutes(t);
}
static inline UInt32 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt32 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -326,6 +374,11 @@ struct TimeSlotImpl
return t / 1800 * 1800;
}
static inline UInt32 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt32 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -348,6 +401,11 @@ struct ToStartOfHourImpl
return time_zone.toStartOfHour(t);
}
static inline UInt32 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt32 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -368,6 +426,10 @@ struct ToYearImpl
{
return time_zone.toYear(t);
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toYear(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toYear(ExtendedDayNum(d));
@ -388,6 +450,10 @@ struct ToQuarterImpl
{
return time_zone.toQuarter(t);
}
static inline UInt8 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toQuarter(ExtendedDayNum(d));
}
static inline UInt8 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toQuarter(ExtendedDayNum(d));
@ -408,6 +474,10 @@ struct ToMonthImpl
{
return time_zone.toMonth(t);
}
static inline UInt8 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toMonth(ExtendedDayNum(d));
}
static inline UInt8 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toMonth(ExtendedDayNum(d));
@ -428,6 +498,10 @@ struct ToDayOfMonthImpl
{
return time_zone.toDayOfMonth(t);
}
static inline UInt8 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toDayOfMonth(ExtendedDayNum(d));
}
static inline UInt8 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toDayOfMonth(ExtendedDayNum(d));
@ -448,6 +522,10 @@ struct ToDayOfWeekImpl
{
return time_zone.toDayOfWeek(t);
}
static inline UInt8 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toDayOfWeek(ExtendedDayNum(d));
}
static inline UInt8 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toDayOfWeek(ExtendedDayNum(d));
@ -468,6 +546,10 @@ struct ToDayOfYearImpl
{
return time_zone.toDayOfYear(t);
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toDayOfYear(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toDayOfYear(ExtendedDayNum(d));
@ -488,7 +570,10 @@ struct ToHourImpl
{
return time_zone.toHour(t);
}
static inline UInt8 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt8 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -511,6 +596,11 @@ struct TimezoneOffsetImpl
return time_zone.timezoneOffset(t);
}
static inline time_t execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline time_t execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -531,6 +621,10 @@ struct ToMinuteImpl
{
return time_zone.toMinute(t);
}
static inline UInt8 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt8 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -551,6 +645,10 @@ struct ToSecondImpl
{
return time_zone.toSecond(t);
}
static inline UInt8 execute(Int32, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
static inline UInt8 execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
@ -571,6 +669,10 @@ struct ToISOYearImpl
{
return time_zone.toISOYear(time_zone.toDayNum(t));
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toISOYear(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toISOYear(ExtendedDayNum(d));
@ -591,6 +693,10 @@ struct ToStartOfISOYearImpl
{
return time_zone.toFirstDayNumOfISOYear(time_zone.toDayNum(t));
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfISOYear(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfISOYear(ExtendedDayNum(d));
@ -611,6 +717,10 @@ struct ToISOWeekImpl
{
return time_zone.toISOWeek(time_zone.toDayNum(t));
}
static inline UInt8 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toISOWeek(ExtendedDayNum(d));
}
static inline UInt8 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toISOWeek(ExtendedDayNum(d));
@ -631,6 +741,10 @@ struct ToRelativeYearNumImpl
{
return time_zone.toYear(static_cast<time_t>(t));
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toYear(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toYear(ExtendedDayNum(d));
@ -651,6 +765,10 @@ struct ToRelativeQuarterNumImpl
{
return time_zone.toRelativeQuarterNum(static_cast<time_t>(t));
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toRelativeQuarterNum(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toRelativeQuarterNum(ExtendedDayNum(d));
@ -671,6 +789,10 @@ struct ToRelativeMonthNumImpl
{
return time_zone.toRelativeMonthNum(static_cast<time_t>(t));
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toRelativeMonthNum(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toRelativeMonthNum(ExtendedDayNum(d));
@ -691,6 +813,10 @@ struct ToRelativeWeekNumImpl
{
return time_zone.toRelativeWeekNum(static_cast<time_t>(t));
}
static inline UInt16 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toRelativeWeekNum(ExtendedDayNum(d));
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toRelativeWeekNum(ExtendedDayNum(d));
@ -711,6 +837,10 @@ struct ToRelativeDayNumImpl
{
return time_zone.toDayNum(static_cast<time_t>(t));
}
static inline UInt16 execute(Int32 d, const DateLUTImpl &)
{
return static_cast<ExtendedDayNum>(d);
}
static inline UInt16 execute(UInt16 d, const DateLUTImpl &)
{
return static_cast<DayNum>(d);
@ -732,6 +862,10 @@ struct ToRelativeHourNumImpl
{
return time_zone.toRelativeHourNum(static_cast<time_t>(t));
}
static inline UInt32 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toRelativeHourNum(ExtendedDayNum(d));
}
static inline UInt32 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toRelativeHourNum(ExtendedDayNum(d));
@ -752,6 +886,10 @@ struct ToRelativeMinuteNumImpl
{
return time_zone.toRelativeMinuteNum(static_cast<time_t>(t));
}
static inline UInt32 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toRelativeMinuteNum(ExtendedDayNum(d));
}
static inline UInt32 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toRelativeMinuteNum(ExtendedDayNum(d));
@ -772,6 +910,10 @@ struct ToRelativeSecondNumImpl
{
return t;
}
static inline UInt32 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.fromDayNum(ExtendedDayNum(d));
}
static inline UInt32 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.fromDayNum(ExtendedDayNum(d));
@ -792,6 +934,10 @@ struct ToYYYYMMImpl
{
return time_zone.toNumYYYYMM(t);
}
static inline UInt32 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toNumYYYYMM(static_cast<ExtendedDayNum>(d));
}
static inline UInt32 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toNumYYYYMM(static_cast<DayNum>(d));
@ -812,6 +958,10 @@ struct ToYYYYMMDDImpl
{
return time_zone.toNumYYYYMMDD(t);
}
static inline UInt32 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toNumYYYYMMDD(static_cast<ExtendedDayNum>(d));
}
static inline UInt32 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toNumYYYYMMDD(static_cast<DayNum>(d));
@ -832,6 +982,10 @@ struct ToYYYYMMDDhhmmssImpl
{
return time_zone.toNumYYYYMMDDhhmmss(t);
}
static inline UInt64 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toNumYYYYMMDDhhmmss(time_zone.toDate(static_cast<ExtendedDayNum>(d)));
}
static inline UInt64 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toNumYYYYMMDDhhmmss(time_zone.toDate(static_cast<DayNum>(d)));

View File

@ -1,5 +1,6 @@
#pragma once
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Functions/CustomWeekTransforms.h>
@ -35,7 +36,7 @@ public:
{
if (arguments.size() == 1)
{
if (!isDate(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Should be a date or a date with time",
@ -43,7 +44,7 @@ public:
}
else if (arguments.size() == 2)
{
if (!isDate(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Should be a date or a date with time",
@ -59,7 +60,7 @@ public:
}
else if (arguments.size() == 3)
{
if (!isDate(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Should be a date or a date with time",
@ -105,6 +106,9 @@ public:
if (which.isDate())
return CustomWeekTransformImpl<DataTypeDate, ToDataType>::execute(
arguments, result_type, input_rows_count, Transform{});
else if (which.isDate32())
return CustomWeekTransformImpl<DataTypeDate32, ToDataType>::execute(
arguments, result_type, input_rows_count, Transform{});
else if (which.isDateTime())
return CustomWeekTransformImpl<DataTypeDateTime, ToDataType>::execute(
arguments, result_type, input_rows_count, Transform{});

View File

@ -2,6 +2,7 @@
#include <common/DateLUTImpl.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -50,7 +51,11 @@ struct AddSecondsImpl
{
return t + delta;
}
static inline NO_SANITIZE_UNDEFINED Int64 execute(Int32 d, Int64 delta, const DateLUTImpl & time_zone)
{
// use default datetime64 scale
return (time_zone.fromDayNum(ExtendedDayNum(d)) + delta) * 1000;
}
static inline NO_SANITIZE_UNDEFINED UInt32 execute(UInt16 d, Int64 delta, const DateLUTImpl & time_zone)
{
return time_zone.fromDayNum(ExtendedDayNum(d)) + delta;
@ -71,7 +76,11 @@ struct AddMinutesImpl
{
return t + delta * 60;
}
static inline NO_SANITIZE_UNDEFINED Int64 execute(Int32 d, Int64 delta, const DateLUTImpl & time_zone)
{
// use default datetime64 scale
return (time_zone.fromDayNum(ExtendedDayNum(d)) + delta * 60) * 1000;
}
static inline NO_SANITIZE_UNDEFINED UInt32 execute(UInt16 d, Int64 delta, const DateLUTImpl & time_zone)
{
return time_zone.fromDayNum(ExtendedDayNum(d)) + delta * 60;
@ -91,7 +100,11 @@ struct AddHoursImpl
{
return t + delta * 3600;
}
static inline NO_SANITIZE_UNDEFINED Int64 execute(Int32 d, Int64 delta, const DateLUTImpl & time_zone)
{
// use default datetime64 scale
return (time_zone.fromDayNum(ExtendedDayNum(d)) + delta * 3600) * 1000;
}
static inline NO_SANITIZE_UNDEFINED UInt32 execute(UInt16 d, Int64 delta, const DateLUTImpl & time_zone)
{
return time_zone.fromDayNum(ExtendedDayNum(d)) + delta * 3600;
@ -117,6 +130,11 @@ struct AddDaysImpl
{
return d + delta;
}
static inline NO_SANITIZE_UNDEFINED Int32 execute(Int32 d, Int64 delta, const DateLUTImpl &)
{
return d + delta;
}
};
struct AddWeeksImpl
@ -124,17 +142,22 @@ struct AddWeeksImpl
static constexpr auto name = "addWeeks";
static inline NO_SANITIZE_UNDEFINED DecimalUtils::DecimalComponents<DateTime64>
execute(DecimalUtils::DecimalComponents<DateTime64> t, Int64 delta, const DateLUTImpl & time_zone)
execute(DecimalUtils::DecimalComponents<DateTime64> t, Int32 delta, const DateLUTImpl & time_zone)
{
return {time_zone.addWeeks(t.whole, delta), t.fractional};
}
static inline NO_SANITIZE_UNDEFINED UInt32 execute(UInt32 t, Int64 delta, const DateLUTImpl & time_zone)
static inline NO_SANITIZE_UNDEFINED UInt32 execute(UInt32 t, Int32 delta, const DateLUTImpl & time_zone)
{
return time_zone.addWeeks(t, delta);
}
static inline NO_SANITIZE_UNDEFINED UInt16 execute(UInt16 d, Int64 delta, const DateLUTImpl &)
static inline NO_SANITIZE_UNDEFINED UInt16 execute(UInt16 d, Int32 delta, const DateLUTImpl &)
{
return d + delta * 7;
}
static inline NO_SANITIZE_UNDEFINED Int32 execute(Int32 d, Int32 delta, const DateLUTImpl &)
{
return d + delta * 7;
}
@ -159,6 +182,11 @@ struct AddMonthsImpl
{
return time_zone.addMonths(ExtendedDayNum(d), delta);
}
static inline Int32 execute(Int32 d, Int64 delta, const DateLUTImpl & time_zone)
{
return time_zone.addMonths(ExtendedDayNum(d), delta);
}
};
struct AddQuartersImpl
@ -166,17 +194,22 @@ struct AddQuartersImpl
static constexpr auto name = "addQuarters";
static inline DecimalUtils::DecimalComponents<DateTime64>
execute(DecimalUtils::DecimalComponents<DateTime64> t, Int64 delta, const DateLUTImpl & time_zone)
execute(DecimalUtils::DecimalComponents<DateTime64> t, Int32 delta, const DateLUTImpl & time_zone)
{
return {time_zone.addQuarters(t.whole, delta), t.fractional};
}
static inline UInt32 execute(UInt32 t, Int64 delta, const DateLUTImpl & time_zone)
static inline UInt32 execute(UInt32 t, Int32 delta, const DateLUTImpl & time_zone)
{
return time_zone.addQuarters(t, delta);
}
static inline UInt16 execute(UInt16 d, Int64 delta, const DateLUTImpl & time_zone)
static inline UInt16 execute(UInt16 d, Int32 delta, const DateLUTImpl & time_zone)
{
return time_zone.addQuarters(ExtendedDayNum(d), delta);
}
static inline Int32 execute(Int32 d, Int32 delta, const DateLUTImpl & time_zone)
{
return time_zone.addQuarters(ExtendedDayNum(d), delta);
}
@ -201,6 +234,11 @@ struct AddYearsImpl
{
return time_zone.addYears(ExtendedDayNum(d), delta);
}
static inline Int32 execute(Int32 d, Int64 delta, const DateLUTImpl & time_zone)
{
return time_zone.addYears(ExtendedDayNum(d), delta);
}
};
template <typename Transform>
@ -342,7 +380,7 @@ template <typename FieldType> struct ResultDataTypeMap {};
template <> struct ResultDataTypeMap<UInt16> { using ResultDataType = DataTypeDate; };
template <> struct ResultDataTypeMap<Int16> { using ResultDataType = DataTypeDate; };
template <> struct ResultDataTypeMap<UInt32> { using ResultDataType = DataTypeDateTime; };
template <> struct ResultDataTypeMap<Int32> { using ResultDataType = DataTypeDateTime; };
template <> struct ResultDataTypeMap<Int32> { using ResultDataType = DataTypeDate32; };
template <> struct ResultDataTypeMap<DateTime64> { using ResultDataType = DataTypeDateTime64; };
template <> struct ResultDataTypeMap<Int64> { using ResultDataType = DataTypeDateTime64; };
}
@ -375,7 +413,7 @@ public:
if (arguments.size() == 2)
{
if (!isDate(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception{"Illegal type " + arguments[0].type->getName() + " of first argument of function " + getName() +
". Should be a date or a date with time", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
@ -398,6 +436,8 @@ public:
{
case TypeIndex::Date:
return resolveReturnType<DataTypeDate>(arguments);
case TypeIndex::Date32:
return resolveReturnType<DataTypeDate32>(arguments);
case TypeIndex::DateTime:
return resolveReturnType<DataTypeDateTime>(arguments);
case TypeIndex::DateTime64:
@ -437,16 +477,23 @@ public:
if constexpr (std::is_same_v<ResultDataType, DataTypeDate>)
return std::make_shared<DataTypeDate>();
else if constexpr (std::is_same_v<ResultDataType, DataTypeDate32>)
return std::make_shared<DataTypeDate32>();
else if constexpr (std::is_same_v<ResultDataType, DataTypeDateTime>)
{
return std::make_shared<DataTypeDateTime>(extractTimeZoneNameFromFunctionArguments(arguments, 2, 0));
}
else if constexpr (std::is_same_v<ResultDataType, DataTypeDateTime64>)
{
// TODO (vnemkov): what if there is an overload of Transform::execute() that returns DateTime64 from DateTime or Date ?
// Shall we use the default scale or one from optional argument ?
const auto & datetime64_type = assert_cast<const DataTypeDateTime64 &>(*arguments[0].type);
return std::make_shared<DataTypeDateTime64>(datetime64_type.getScale(), extractTimeZoneNameFromFunctionArguments(arguments, 2, 0));
if (typeid_cast<const DataTypeDateTime64 *>(arguments[0].type.get()))
{
const auto & datetime64_type = assert_cast<const DataTypeDateTime64 &>(*arguments[0].type);
return std::make_shared<DataTypeDateTime64>(datetime64_type.getScale(), extractTimeZoneNameFromFunctionArguments(arguments, 2, 0));
}
else
{
return std::make_shared<DataTypeDateTime64>(DataTypeDateTime64::default_scale, extractTimeZoneNameFromFunctionArguments(arguments, 2, 0));
}
}
else
{
@ -470,6 +517,11 @@ public:
return DateTimeAddIntervalImpl<DataTypeDate, TransformResultDataType<DataTypeDate>, Transform>::execute(
Transform{}, arguments, result_type);
}
else if (which.isDate32())
{
return DateTimeAddIntervalImpl<DataTypeDate32, TransformResultDataType<DataTypeDate32>, Transform>::execute(
Transform{}, arguments, result_type);
}
else if (which.isDateTime())
{
return DateTimeAddIntervalImpl<DataTypeDateTime, TransformResultDataType<DataTypeDateTime>, Transform>::execute(

View File

@ -1,5 +1,6 @@
#pragma once
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Functions/IFunction.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -38,7 +39,7 @@ public:
{
if (arguments.size() == 1)
{
if (!isDate(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Should be a date or a date with time",
@ -46,7 +47,7 @@ public:
}
else if (arguments.size() == 2)
{
if (!isDate(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
if (!isDate(arguments[0].type) && !isDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception(
"Illegal type " + arguments[0].type->getName() + " of argument of function " + getName()
+ ". Should be a date or a date with time",
@ -57,7 +58,7 @@ public:
"must be of type Date or DateTime. The 2nd argument (optional) must be "
"a constant string with timezone name",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (isDate(arguments[0].type) && std::is_same_v<ToDataType, DataTypeDate>)
if ((isDate(arguments[0].type) || isDate32(arguments[0].type)) && (std::is_same_v<ToDataType, DataTypeDate> || std::is_same_v<ToDataType, DataTypeDate32>))
throw Exception(
"The timezone argument of function " + getName() + " is allowed only when the 1st argument has the type DateTime",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
@ -103,6 +104,8 @@ public:
if (which.isDate())
return DateTimeTransformImpl<DataTypeDate, ToDataType, Transform>::execute(arguments, result_type, input_rows_count);
else if (which.isDate32())
return DateTimeTransformImpl<DataTypeDate32, ToDataType, Transform>::execute(arguments, result_type, input_rows_count);
else if (which.isDateTime())
return DateTimeTransformImpl<DataTypeDateTime, ToDataType, Transform>::execute(arguments, result_type, input_rows_count);
else if (which.isDateTime64())
@ -146,6 +149,12 @@ public:
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(&type))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), date_lut)

View File

@ -1081,7 +1081,7 @@ public:
const DataTypeTuple * right_tuple = checkAndGetDataType<DataTypeTuple>(arguments[1].get());
bool both_represented_by_number = arguments[0]->isValueRepresentedByNumber() && arguments[1]->isValueRepresentedByNumber();
bool has_date = left.isDate() || right.isDate();
bool has_date = left.isDateOrDate32() || right.isDateOrDate32();
if (!((both_represented_by_number && !has_date) /// Do not allow to compare date and number.
|| (left.isStringOrFixedString() || right.isStringOrFixedString()) /// Everything can be compared with string by conversion.

View File

@ -32,7 +32,7 @@ void registerFunctionsConversion(FunctionFactory & factory)
factory.registerFunction<FunctionToDate>();
/// MysQL compatibility alias.
factory.registerFunction<FunctionToDate>("DATE", FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionToDate32>();
factory.registerFunction<FunctionToDateTime>();
factory.registerFunction<FunctionToDateTime32>();
factory.registerFunction<FunctionToDateTime64>();
@ -62,6 +62,7 @@ void registerFunctionsConversion(FunctionFactory & factory)
factory.registerFunction<FunctionToFloat32OrZero>();
factory.registerFunction<FunctionToFloat64OrZero>();
factory.registerFunction<FunctionToDateOrZero>();
factory.registerFunction<FunctionToDate32OrZero>();
factory.registerFunction<FunctionToDateTimeOrZero>();
factory.registerFunction<FunctionToDateTime64OrZero>();
@ -87,6 +88,7 @@ void registerFunctionsConversion(FunctionFactory & factory)
factory.registerFunction<FunctionToFloat32OrNull>();
factory.registerFunction<FunctionToFloat64OrNull>();
factory.registerFunction<FunctionToDateOrNull>();
factory.registerFunction<FunctionToDate32OrNull>();
factory.registerFunction<FunctionToDateTimeOrNull>();
factory.registerFunction<FunctionToDateTime64OrNull>();

View File

@ -12,6 +12,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeEnum.h>
@ -670,6 +671,8 @@ struct ConvertImpl<FromDataType, std::enable_if_t<!std::is_same_v<FromDataType,
if constexpr (std::is_same_v<FromDataType, DataTypeDate>)
data_to.resize(size * (strlen("YYYY-MM-DD") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDate32>)
data_to.resize(size * (strlen("YYYY-MM-DD") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDateTime>)
data_to.resize(size * (strlen("YYYY-MM-DD hh:mm:ss") + 1));
else if constexpr (std::is_same_v<FromDataType, DataTypeDateTime64>)
@ -751,6 +754,14 @@ inline void parseImpl<DataTypeDate>(DataTypeDate::FieldType & x, ReadBuffer & rb
x = tmp;
}
template <>
inline void parseImpl<DataTypeDate32>(DataTypeDate32::FieldType & x, ReadBuffer & rb, const DateLUTImpl *)
{
ExtendedDayNum tmp(0);
readDateText(tmp, rb);
x = tmp;
}
// NOTE: no need of extra overload of DateTime64, since readDateTimeText64 has different signature and that case is explicitly handled in the calling code.
template <>
inline void parseImpl<DataTypeDateTime>(DataTypeDateTime::FieldType & x, ReadBuffer & rb, const DateLUTImpl * time_zone)
@ -791,6 +802,16 @@ inline bool tryParseImpl<DataTypeDate>(DataTypeDate::FieldType & x, ReadBuffer &
return true;
}
template <>
inline bool tryParseImpl<DataTypeDate32>(DataTypeDate32::FieldType & x, ReadBuffer & rb, const DateLUTImpl *)
{
ExtendedDayNum tmp(0);
if (!tryReadDateText(tmp, rb))
return false;
x = tmp;
return true;
}
template <>
inline bool tryParseImpl<DataTypeDateTime>(DataTypeDateTime::FieldType & x, ReadBuffer & rb, const DateLUTImpl * time_zone)
{
@ -1215,6 +1236,7 @@ struct ConvertImpl<DataTypeFixedString, DataTypeString, Name, ConvertDefaultBeha
/// Declared early because used below.
struct NameToDate { static constexpr auto name = "toDate"; };
struct NameToDate32 { static constexpr auto name = "toDate32"; };
struct NameToDateTime { static constexpr auto name = "toDateTime"; };
struct NameToDateTime32 { static constexpr auto name = "toDateTime32"; };
struct NameToDateTime64 { static constexpr auto name = "toDateTime64"; };
@ -1890,7 +1912,7 @@ struct ToDateMonotonicity
static IFunction::Monotonicity get(const IDataType & type, const Field & left, const Field & right)
{
auto which = WhichDataType(type);
if (which.isDate() || which.isDateTime() || which.isDateTime64() || which.isInt8() || which.isInt16() || which.isUInt8() || which.isUInt16())
if (which.isDate() || which.isDate32() || which.isDateTime() || which.isDateTime64() || which.isInt8() || which.isInt16() || which.isUInt8() || which.isUInt16())
return {true, true, true};
else if (
(which.isUInt() && ((left.isNull() || left.get<UInt64>() < 0xFFFF) && (right.isNull() || right.get<UInt64>() >= 0xFFFF)))
@ -1991,6 +2013,7 @@ using FunctionToInt256 = FunctionConvert<DataTypeInt256, NameToInt256, ToNumberM
using FunctionToFloat32 = FunctionConvert<DataTypeFloat32, NameToFloat32, ToNumberMonotonicity<Float32>>;
using FunctionToFloat64 = FunctionConvert<DataTypeFloat64, NameToFloat64, ToNumberMonotonicity<Float64>>;
using FunctionToDate = FunctionConvert<DataTypeDate, NameToDate, ToDateMonotonicity>;
using FunctionToDate32 = FunctionConvert<DataTypeDate32, NameToDate32, ToDateMonotonicity>;
using FunctionToDateTime = FunctionConvert<DataTypeDateTime, NameToDateTime, ToDateTimeMonotonicity>;
using FunctionToDateTime32 = FunctionConvert<DataTypeDateTime, NameToDateTime32, ToDateTimeMonotonicity>;
using FunctionToDateTime64 = FunctionConvert<DataTypeDateTime64, NameToDateTime64, UnknownMonotonicity>;
@ -2050,6 +2073,7 @@ struct NameToInt256OrZero { static constexpr auto name = "toInt256OrZero"; };
struct NameToFloat32OrZero { static constexpr auto name = "toFloat32OrZero"; };
struct NameToFloat64OrZero { static constexpr auto name = "toFloat64OrZero"; };
struct NameToDateOrZero { static constexpr auto name = "toDateOrZero"; };
struct NameToDate32OrZero { static constexpr auto name = "toDate32OrZero"; };
struct NameToDateTimeOrZero { static constexpr auto name = "toDateTimeOrZero"; };
struct NameToDateTime64OrZero { static constexpr auto name = "toDateTime64OrZero"; };
struct NameToDecimal32OrZero { static constexpr auto name = "toDecimal32OrZero"; };
@ -2073,6 +2097,7 @@ using FunctionToInt256OrZero = FunctionConvertFromString<DataTypeInt256, NameToI
using FunctionToFloat32OrZero = FunctionConvertFromString<DataTypeFloat32, NameToFloat32OrZero, ConvertFromStringExceptionMode::Zero>;
using FunctionToFloat64OrZero = FunctionConvertFromString<DataTypeFloat64, NameToFloat64OrZero, ConvertFromStringExceptionMode::Zero>;
using FunctionToDateOrZero = FunctionConvertFromString<DataTypeDate, NameToDateOrZero, ConvertFromStringExceptionMode::Zero>;
using FunctionToDate32OrZero = FunctionConvertFromString<DataTypeDate32, NameToDate32OrZero, ConvertFromStringExceptionMode::Zero>;
using FunctionToDateTimeOrZero = FunctionConvertFromString<DataTypeDateTime, NameToDateTimeOrZero, ConvertFromStringExceptionMode::Zero>;
using FunctionToDateTime64OrZero = FunctionConvertFromString<DataTypeDateTime64, NameToDateTime64OrZero, ConvertFromStringExceptionMode::Zero>;
using FunctionToDecimal32OrZero = FunctionConvertFromString<DataTypeDecimal<Decimal32>, NameToDecimal32OrZero, ConvertFromStringExceptionMode::Zero>;
@ -2096,6 +2121,7 @@ struct NameToInt256OrNull { static constexpr auto name = "toInt256OrNull"; };
struct NameToFloat32OrNull { static constexpr auto name = "toFloat32OrNull"; };
struct NameToFloat64OrNull { static constexpr auto name = "toFloat64OrNull"; };
struct NameToDateOrNull { static constexpr auto name = "toDateOrNull"; };
struct NameToDate32OrNull { static constexpr auto name = "toDate32OrNull"; };
struct NameToDateTimeOrNull { static constexpr auto name = "toDateTimeOrNull"; };
struct NameToDateTime64OrNull { static constexpr auto name = "toDateTime64OrNull"; };
struct NameToDecimal32OrNull { static constexpr auto name = "toDecimal32OrNull"; };
@ -2119,6 +2145,7 @@ using FunctionToInt256OrNull = FunctionConvertFromString<DataTypeInt256, NameToI
using FunctionToFloat32OrNull = FunctionConvertFromString<DataTypeFloat32, NameToFloat32OrNull, ConvertFromStringExceptionMode::Null>;
using FunctionToFloat64OrNull = FunctionConvertFromString<DataTypeFloat64, NameToFloat64OrNull, ConvertFromStringExceptionMode::Null>;
using FunctionToDateOrNull = FunctionConvertFromString<DataTypeDate, NameToDateOrNull, ConvertFromStringExceptionMode::Null>;
using FunctionToDate32OrNull = FunctionConvertFromString<DataTypeDate32, NameToDate32OrNull, ConvertFromStringExceptionMode::Null>;
using FunctionToDateTimeOrNull = FunctionConvertFromString<DataTypeDateTime, NameToDateTimeOrNull, ConvertFromStringExceptionMode::Null>;
using FunctionToDateTime64OrNull = FunctionConvertFromString<DataTypeDateTime64, NameToDateTime64OrNull, ConvertFromStringExceptionMode::Null>;
using FunctionToDecimal32OrNull = FunctionConvertFromString<DataTypeDecimal<Decimal32>, NameToDecimal32OrNull, ConvertFromStringExceptionMode::Null>;

View File

@ -683,6 +683,8 @@ public:
return executeType<Int64>(arguments);
else if (which.isDate())
return executeType<UInt16>(arguments);
else if (which.isDate32())
return executeType<Int32>(arguments);
else if (which.isDateTime())
return executeType<UInt32>(arguments);
else if (which.isDecimal32())
@ -986,6 +988,7 @@ private:
else if (which.isEnum8()) executeIntType<Int8, first>(icolumn, vec_to);
else if (which.isEnum16()) executeIntType<Int16, first>(icolumn, vec_to);
else if (which.isDate()) executeIntType<UInt16, first>(icolumn, vec_to);
else if (which.isDate32()) executeIntType<Int32, first>(icolumn, vec_to);
else if (which.isDateTime()) executeIntType<UInt32, first>(icolumn, vec_to);
/// TODO: executeIntType() for Decimal32/64 leads to incompatible result
else if (which.isDecimal32()) executeBigIntType<Decimal32, first>(icolumn, vec_to);

View File

@ -113,7 +113,8 @@ public:
virtual ~IFunctionBase() = default;
virtual ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run = false) const
virtual ColumnPtr execute(
const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run = false) const
{
return prepare(arguments)->execute(arguments, result_type, input_rows_count, dry_run);
}
@ -161,7 +162,8 @@ public:
* Arguments are passed without modifications, useDefaultImplementationForNulls, useDefaultImplementationForConstants,
* useDefaultImplementationForLowCardinality are not applied.
*/
virtual ColumnPtr getConstantResultForNonConstArguments(const ColumnsWithTypeAndName & /* arguments */, const DataTypePtr & /* result_type */) const { return nullptr; }
virtual ColumnPtr getConstantResultForNonConstArguments(
const ColumnsWithTypeAndName & /* arguments */, const DataTypePtr & /* result_type */) const { return nullptr; }
/** Function is called "injective" if it returns different result for different values of arguments.
* Example: hex, negate, tuple...

View File

@ -6,6 +6,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
@ -407,6 +408,9 @@ ColumnPtr FunctionArrayIntersect::executeImpl(const ColumnsWithTypeAndName & arg
using DateMap = ClearableHashMapWithStackMemory<DataTypeDate::FieldType,
size_t, DefaultHash<DataTypeDate::FieldType>, INITIAL_SIZE_DEGREE>;
using Date32Map = ClearableHashMapWithStackMemory<DataTypeDate32::FieldType,
size_t, DefaultHash<DataTypeDate32::FieldType>, INITIAL_SIZE_DEGREE>;
using DateTimeMap = ClearableHashMapWithStackMemory<
DataTypeDateTime::FieldType, size_t,
DefaultHash<DataTypeDateTime::FieldType>, INITIAL_SIZE_DEGREE>;
@ -421,6 +425,8 @@ ColumnPtr FunctionArrayIntersect::executeImpl(const ColumnsWithTypeAndName & arg
if (which.isDate())
result_column = execute<DateMap, ColumnVector<DataTypeDate::FieldType>, true>(arrays, std::move(column));
else if (which.isDate32())
result_column = execute<Date32Map, ColumnVector<DataTypeDate32::FieldType>, true>(arrays, std::move(column));
else if (which.isDateTime())
result_column = execute<DateTimeMap, ColumnVector<DataTypeDateTime::FieldType>, true>(arrays, std::move(column));
else if (which.isString())

View File

@ -5,6 +5,7 @@
#include <Columns/ColumnNullable.h>
#include <Common/assert_cast.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesNumber.h>
@ -115,6 +116,8 @@ private:
f(Float64());
else if (which.isDate())
f(DataTypeDate::FieldType());
else if (which.isDate32())
f(DataTypeDate::FieldType());
else if (which.isDateTime())
f(DataTypeDateTime::FieldType());
else

View File

@ -1,6 +1,7 @@
#include <common/DateLUTImpl.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeInterval.h>
@ -39,6 +40,11 @@ namespace
return time_zone.toStartOfYearInterval(ExtendedDayNum(d), years);
}
static UInt16 execute(Int32 d, UInt64 years, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfYearInterval(ExtendedDayNum(d), years);
}
static UInt16 execute(UInt32 t, UInt64 years, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfYearInterval(time_zone.toDayNum(t), years);
@ -60,6 +66,11 @@ namespace
return time_zone.toStartOfQuarterInterval(ExtendedDayNum(d), quarters);
}
static UInt16 execute(Int32 d, UInt64 quarters, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfQuarterInterval(ExtendedDayNum(d), quarters);
}
static UInt16 execute(UInt32 t, UInt64 quarters, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfQuarterInterval(time_zone.toDayNum(t), quarters);
@ -81,6 +92,11 @@ namespace
return time_zone.toStartOfMonthInterval(ExtendedDayNum(d), months);
}
static UInt16 execute(Int32 d, UInt64 months, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfMonthInterval(ExtendedDayNum(d), months);
}
static UInt16 execute(UInt32 t, UInt64 months, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfMonthInterval(time_zone.toDayNum(t), months);
@ -102,6 +118,11 @@ namespace
return time_zone.toStartOfWeekInterval(ExtendedDayNum(d), weeks);
}
static UInt16 execute(Int32 d, UInt64 weeks, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfWeekInterval(ExtendedDayNum(d), weeks);
}
static UInt16 execute(UInt32 t, UInt64 weeks, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfWeekInterval(time_zone.toDayNum(t), weeks);
@ -123,6 +144,11 @@ namespace
return time_zone.toStartOfDayInterval(ExtendedDayNum(d), days);
}
static UInt32 execute(Int32 d, UInt64 days, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfDayInterval(ExtendedDayNum(d), days);
}
static UInt32 execute(UInt32 t, UInt64 days, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfDayInterval(time_zone.toDayNum(t), days);
@ -140,6 +166,7 @@ namespace
static constexpr auto name = function_name;
static UInt32 execute(UInt16, UInt64, const DateLUTImpl &) { return dateIsNotSupported(function_name); }
static UInt32 execute(Int32, UInt64, const DateLUTImpl &) { return dateIsNotSupported(function_name); }
static UInt32 execute(UInt32 t, UInt64 hours, const DateLUTImpl & time_zone) { return time_zone.toStartOfHourInterval(t, hours); }
static UInt32 execute(Int64 t, UInt64 hours, const DateLUTImpl & time_zone) { return time_zone.toStartOfHourInterval(t, hours); }
};
@ -151,6 +178,8 @@ namespace
static UInt32 execute(UInt16, UInt64, const DateLUTImpl &) { return dateIsNotSupported(function_name); }
static UInt32 execute(Int32, UInt64, const DateLUTImpl &) { return dateIsNotSupported(function_name); }
static UInt32 execute(UInt32 t, UInt64 minutes, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfMinuteInterval(t, minutes);
@ -169,6 +198,8 @@ namespace
static UInt32 execute(UInt16, UInt64, const DateLUTImpl &) { return dateIsNotSupported(function_name); }
static UInt32 execute(Int32, UInt64, const DateLUTImpl &) { return dateIsNotSupported(function_name); }
static UInt32 execute(UInt32 t, UInt64 seconds, const DateLUTImpl & time_zone)
{
return time_zone.toStartOfSecondInterval(t, seconds);
@ -299,6 +330,12 @@ private:
if (time_column_vec)
return dispatchForIntervalColumn(assert_cast<const DataTypeDate&>(from_datatype), *time_column_vec, interval_column, time_zone);
}
if (which_type.isDate32())
{
const auto * time_column_vec = checkAndGetColumn<ColumnInt32>(time_column.column.get());
if (time_column_vec)
return dispatchForIntervalColumn(assert_cast<const DataTypeDate32&>(from_datatype), *time_column_vec, interval_column, time_zone);
}
if (which_type.isDateTime64())
{
const auto * time_column_vec = checkAndGetColumn<DataTypeDateTime64::ColumnType>(time_column.column.get());

View File

@ -632,6 +632,22 @@ inline ReturnType readDateTextImpl(DayNum & date, ReadBuffer & buf)
return ReturnType(true);
}
template <typename ReturnType = void>
inline ReturnType readDateTextImpl(ExtendedDayNum & date, ReadBuffer & buf)
{
static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
LocalDate local_date;
if constexpr (throw_exception)
readDateTextImpl<ReturnType>(local_date, buf);
else if (!readDateTextImpl<ReturnType>(local_date, buf))
return false;
/// When the parameter is out of rule or out of range, Date32 uses 1925-01-01 as the default value (-DateLUT::instance().getDayNumOffsetEpoch(), -16436) and Date uses 1970-01-01.
date = DateLUT::instance().makeDayNum(local_date.year(), local_date.month(), local_date.day(), -DateLUT::instance().getDayNumOffsetEpoch());
return ReturnType(true);
}
inline void readDateText(LocalDate & date, ReadBuffer & buf)
{
@ -643,6 +659,11 @@ inline void readDateText(DayNum & date, ReadBuffer & buf)
readDateTextImpl<void>(date, buf);
}
inline void readDateText(ExtendedDayNum & date, ReadBuffer & buf)
{
readDateTextImpl<void>(date, buf);
}
inline bool tryReadDateText(LocalDate & date, ReadBuffer & buf)
{
return readDateTextImpl<bool>(date, buf);
@ -653,6 +674,11 @@ inline bool tryReadDateText(DayNum & date, ReadBuffer & buf)
return readDateTextImpl<bool>(date, buf);
}
inline bool tryReadDateText(ExtendedDayNum & date, ReadBuffer & buf)
{
return readDateTextImpl<bool>(date, buf);
}
template <typename ReturnType = void>
inline ReturnType readUUIDTextImpl(UUID & uuid, ReadBuffer & buf)
{

View File

@ -17,7 +17,7 @@ public:
* Shifts buffer current position to given offset.
* @param off Offset.
* @param whence Seek mode (@see SEEK_SET, @see SEEK_CUR).
* @return New position from the begging of underlying buffer / file.
* @return New position from the beginning of underlying buffer / file.
*/
virtual off_t seek(off_t off, int whence) = 0;

View File

@ -728,6 +728,11 @@ inline void writeDateText(DayNum date, WriteBuffer & buf)
writeDateText<delimiter>(LocalDate(date), buf);
}
template <char delimiter = '-'>
inline void writeDateText(ExtendedDayNum date, WriteBuffer & buf)
{
writeDateText<delimiter>(LocalDate(date), buf);
}
/// In the format YYYY-MM-DD HH:MM:SS
template <char date_delimeter = '-', char time_delimeter = ':', char between_date_time_delimiter = ' '>

View File

@ -26,6 +26,7 @@ namespace ErrorCodes
extern const int THERE_IS_NO_COLUMN;
extern const int ILLEGAL_COLUMN;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int BAD_ARGUMENTS;
}
const char * ActionsDAG::typeToString(ActionsDAG::ActionType type)
@ -202,6 +203,7 @@ const ActionsDAG::Node & ActionsDAG::addFunction(
node.function_base = function->build(arguments);
node.result_type = node.function_base->getResultType();
node.function = node.function_base->prepare(arguments);
node.is_deterministic = node.function_base->isDeterministic();
/// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function.
if (node.function_base->isSuitableForConstantFolding())
@ -426,6 +428,16 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs)
{
/// Constant folding.
node->type = ActionsDAG::ActionType::COLUMN;
for (const auto & child : node->children)
{
if (!child->is_deterministic)
{
node->is_deterministic = false;
break;
}
}
node->children.clear();
}
@ -981,6 +993,14 @@ bool ActionsDAG::trivial() const
return true;
}
void ActionsDAG::assertDeterministic() const
{
for (const auto & node : nodes)
if (!node.is_deterministic)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Expression must be deterministic but it contains non-deterministic part `{}`", node.result_name);
}
void ActionsDAG::addMaterializingOutputActions()
{
for (auto & node : index)

View File

@ -83,6 +83,9 @@ public:
ExecutableFunctionPtr function;
/// If function is a compiled statement.
bool is_function_compiled = false;
/// It is deterministic (See IFunction::isDeterministic).
/// This property is kept after constant folding of non-deterministic functions like 'now', 'today'.
bool is_deterministic = true;
/// For COLUMN node and propagated constants.
ColumnPtr column;
@ -175,6 +178,7 @@ public:
bool hasArrayJoin() const;
bool hasStatefulFunctions() const;
bool trivial() const; /// If actions has no functions or array join.
void assertDeterministic() const; /// Throw if not isDeterministic.
#if USE_EMBEDDED_COMPILER
void compileExpressions(size_t min_count_to_compile_expression);

View File

@ -90,6 +90,7 @@ struct BloomFilterHash
else if (which.isEnum8()) return build_hash_column(getNumberTypeHash<Int64, Int8>(field));
else if (which.isEnum16()) return build_hash_column(getNumberTypeHash<Int64, Int16>(field));
else if (which.isDate()) return build_hash_column(getNumberTypeHash<UInt64, UInt16>(field));
else if (which.isDate32()) return build_hash_column(getNumberTypeHash<UInt64, Int32>(field));
else if (which.isDateTime()) return build_hash_column(getNumberTypeHash<UInt64, UInt32>(field));
else if (which.isFloat32()) return build_hash_column(getNumberTypeHash<Float64, Float64>(field));
else if (which.isFloat64()) return build_hash_column(getNumberTypeHash<Float64, Float64>(field));
@ -151,6 +152,7 @@ struct BloomFilterHash
else if (which.isEnum8()) getNumberTypeHash<Int8, is_first>(column, vec, pos);
else if (which.isEnum16()) getNumberTypeHash<Int16, is_first>(column, vec, pos);
else if (which.isDate()) getNumberTypeHash<UInt16, is_first>(column, vec, pos);
else if (which.isDate32()) getNumberTypeHash<Int32, is_first>(column, vec, pos);
else if (which.isDateTime()) getNumberTypeHash<UInt32, is_first>(column, vec, pos);
else if (which.isFloat32()) getNumberTypeHash<Float32, is_first>(column, vec, pos);
else if (which.isFloat64()) getNumberTypeHash<Float64, is_first>(column, vec, pos);

View File

@ -149,6 +149,7 @@ void executeQuery(
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
shard_info,
not_optimized_cluster->getSlotToShard(),

View File

@ -388,6 +388,7 @@ struct ContextSharedPart
ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers
std::unique_ptr<SystemLogs> system_logs; /// Used to log queries and operations on parts
std::optional<StorageS3Settings> storage_s3_settings; /// Settings of S3 storage
std::vector<String> warnings; /// Store warning messages about server configuration.
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
@ -518,6 +519,13 @@ struct ContextSharedPart
trace_collector.emplace(std::move(trace_log));
}
void addWarningMessage(const String & message)
{
/// A warning goes both: into server's log; stored to be placed in `system.warnings` table.
log->warning(message);
warnings.push_back(message);
}
};
@ -634,6 +642,12 @@ String Context::getDictionariesLibPath() const
return shared->dictionaries_lib_path;
}
std::vector<String> Context::getWarnings() const
{
auto lock = getLock();
return shared->warnings;
}
VolumePtr Context::getTemporaryVolume() const
{
auto lock = getLock();
@ -705,6 +719,12 @@ void Context::setDictionariesLibPath(const String & path)
shared->dictionaries_lib_path = path;
}
void Context::addWarningMessage(const String & msg)
{
auto lock = getLock();
shared->addWarningMessage(msg);
}
void Context::setConfig(const ConfigurationPtr & config)
{
auto lock = getLock();

View File

@ -319,6 +319,9 @@ public:
String getUserFilesPath() const;
String getDictionariesLibPath() const;
/// A list of warnings about server configuration to place in `system.warnings` table.
std::vector<String> getWarnings() const;
VolumePtr getTemporaryVolume() const;
void setPath(const String & path);
@ -326,6 +329,8 @@ public:
void setUserFilesPath(const String & path);
void setDictionariesLibPath(const String & path);
void addWarningMessage(const String & msg);
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = "");
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;

View File

@ -531,11 +531,12 @@ Names ExpressionActions::getRequiredColumns() const
bool ExpressionActions::hasArrayJoin() const
{
for (const auto & action : actions)
if (action.node->type == ActionsDAG::ActionType::ARRAY_JOIN)
return true;
return getActionsDAG().hasArrayJoin();
}
return false;
void ExpressionActions::assertDeterministic() const
{
getActionsDAG().assertDeterministic();
}

View File

@ -103,6 +103,7 @@ public:
void execute(Block & block, bool dry_run = false) const;
bool hasArrayJoin() const;
void assertDeterministic() const;
/// Obtain a sample block that contains the names and types of result columns.
const Block & getSampleBlock() const { return sample_block; }

View File

@ -198,8 +198,9 @@ private:
{
ASTPtr & ast = func.arguments->children[1];
/// Literal can use regular IN
if (ast->as<ASTLiteral>())
/// Literal or function can use regular IN.
/// NOTE: We don't support passing table functions to IN.
if (ast->as<ASTLiteral>() || ast->as<ASTFunction>())
{
if (func.name == "globalIn")
func.name = "in";

View File

@ -283,6 +283,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
checkStackSize();
query_info.ignore_projections = options.ignore_projections;
query_info.is_projection_query = options.is_projection_query;
initSettings();
const Settings & settings = context->getSettingsRef();
@ -399,7 +400,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
view = nullptr;
}
if (try_move_to_prewhere && storage && query.where() && !query.prewhere())
if (try_move_to_prewhere && storage && storage->supportsPrewhere() && query.where() && !query.prewhere())
{
/// PREWHERE optimization: transfer some condition from WHERE to PREWHERE if enabled and viable
if (const auto & column_sizes = storage->getColumnSizes(); !column_sizes.empty())
@ -575,9 +576,9 @@ void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
/// We must guarantee that result structure is the same as in getSampleBlock()
///
/// But if we ignore aggregation, plan header does not match result_header.
/// But if it's a projection query, plan header does not match result_header.
/// TODO: add special stage for InterpreterSelectQuery?
if (!options.ignore_aggregation && !blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
if (!options.is_projection_query && !blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
@ -2013,7 +2014,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
expression_before_aggregation->setStepDescription("Before GROUP BY");
query_plan.addStep(std::move(expression_before_aggregation));
if (options.ignore_aggregation)
if (options.is_projection_query)
return;
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;

View File

@ -3,7 +3,6 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <DataTypes/FieldToDataType.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/OptimizeShardingKeyRewriteInVisitor.h>
@ -13,12 +12,12 @@ namespace
using namespace DB;
Field executeFunctionOnField(
const Field & field, const std::string & name,
const Field & field,
const std::string & name,
const ExpressionActionsPtr & sharding_expr,
const DataTypePtr & type,
const std::string & sharding_key_column_name)
{
DataTypePtr type = applyVisitor(FieldToDataType{}, field);
ColumnWithTypeAndName column;
column.column = type->createColumnConst(1, field);
column.name = name;
@ -34,25 +33,26 @@ Field executeFunctionOnField(
/// @param sharding_column_value - one of values from IN
/// @param sharding_column_name - name of that column
/// @param sharding_expr - expression of sharding_key for the Distributed() table
/// @param sharding_key_column_name - name of the column for sharding_expr
/// @param shard_info - info for the current shard (to compare shard_num with calculated)
/// @param slots - weight -> shard mapping
/// @return true if shard may contain such value (or it is unknown), otherwise false.
bool shardContains(
const Field & sharding_column_value,
Field sharding_column_value,
const std::string & sharding_column_name,
const ExpressionActionsPtr & sharding_expr,
const std::string & sharding_key_column_name,
const Cluster::ShardInfo & shard_info,
const Cluster::SlotToShard & slots)
const OptimizeShardingKeyRewriteInMatcher::Data & data)
{
UInt64 field_value;
/// Convert value to numeric (if required).
if (!sharding_column_value.tryGet<UInt64>(field_value))
sharding_column_value = convertFieldToType(sharding_column_value, *data.sharding_key_type);
/// NULL is not allowed in sharding key,
/// so it should be safe to assume that shard cannot contain it.
if (sharding_column_value.isNull())
return false;
Field sharding_value = executeFunctionOnField(sharding_column_value, sharding_column_name, sharding_expr, sharding_key_column_name);
Field sharding_value = executeFunctionOnField(
sharding_column_value, sharding_column_name,
data.sharding_key_expr, data.sharding_key_type,
data.sharding_key_column_name);
/// The value from IN can be non-numeric,
/// but in this case it should be convertible to numeric type, let's try.
sharding_value = convertFieldToType(sharding_value, DataTypeUInt64());
@ -61,8 +61,8 @@ bool shardContains(
return false;
UInt64 value = sharding_value.get<UInt64>();
const auto shard_num = slots[value % slots.size()] + 1;
return shard_info.shard_num == shard_num;
const auto shard_num = data.slots[value % data.slots.size()] + 1;
return data.shard_info.shard_num == shard_num;
}
}
@ -92,10 +92,7 @@ void OptimizeShardingKeyRewriteInMatcher::visit(ASTFunction & function, Data & d
if (!identifier)
return;
const auto & sharding_expr = data.sharding_key_expr;
const auto & sharding_key_column_name = data.sharding_key_column_name;
if (!sharding_expr->getRequiredColumnsWithTypes().contains(identifier->name()))
if (!data.sharding_key_expr->getRequiredColumnsWithTypes().contains(identifier->name()))
return;
/// NOTE: that we should not take care about empty tuple,
@ -107,7 +104,7 @@ void OptimizeShardingKeyRewriteInMatcher::visit(ASTFunction & function, Data & d
std::erase_if(tuple_elements->children, [&](auto & child)
{
auto * literal = child->template as<ASTLiteral>();
return literal && !shardContains(literal->value, identifier->name(), sharding_expr, sharding_key_column_name, data.shard_info, data.slots);
return literal && !shardContains(literal->value, identifier->name(), data);
});
}
else if (auto * tuple_literal = right->as<ASTLiteral>();
@ -116,7 +113,7 @@ void OptimizeShardingKeyRewriteInMatcher::visit(ASTFunction & function, Data & d
auto & tuple = tuple_literal->value.get<Tuple &>();
std::erase_if(tuple, [&](auto & child)
{
return !shardContains(child, identifier->name(), sharding_expr, sharding_key_column_name, data.shard_info, data.slots);
return !shardContains(child, identifier->name(), data);
});
}
}

View File

@ -25,9 +25,15 @@ struct OptimizeShardingKeyRewriteInMatcher
struct Data
{
/// Expression of sharding_key for the Distributed() table
const ExpressionActionsPtr & sharding_key_expr;
/// Type of sharding_key column.
const DataTypePtr & sharding_key_type;
/// Name of the column for sharding_expr
const std::string & sharding_key_column_name;
/// Info for the current shard (to compare shard_num with calculated)
const Cluster::ShardInfo & shard_info;
/// weight -> shard mapping
const Cluster::SlotToShard & slots;
};

View File

@ -32,13 +32,14 @@ struct SelectQueryOptions
bool remove_duplicates = false;
bool ignore_quota = false;
bool ignore_limits = false;
/// This is a temporary flag to avoid adding aggregating step. Used for projections.
/// TODO: we need more stages for InterpreterSelectQuery
bool ignore_aggregation = false;
/// This flag is needed to analyze query ignoring table projections.
/// It is needed because we build another one InterpreterSelectQuery while analyzing projections.
/// It helps to avoid infinite recursion.
bool ignore_projections = false;
/// This flag is also used for projection analysis.
/// It is needed because lazy normal projections require special planning in FetchColumns stage, such as adding WHERE transform.
/// It is also used to avoid adding aggregating step when aggregate projection is chosen.
bool is_projection_query = false;
bool ignore_alias = false;
bool is_internal = false;
bool is_subquery = false; // non-subquery can also have subquery_depth > 0, e.g. insert select
@ -100,9 +101,9 @@ struct SelectQueryOptions
return *this;
}
SelectQueryOptions & ignoreAggregation(bool value = true)
SelectQueryOptions & projectionQuery(bool value = true)
{
ignore_aggregation = value;
is_projection_query = value;
return *this;
}

View File

@ -159,10 +159,18 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
{
return static_cast<UInt16>(static_cast<const DataTypeDateTime &>(*from_type_hint).getTimeZone().toDayNum(src.get<UInt64>()).toUnderType());
}
else if (which_type.isDate32() && which_from_type.isDateTime())
{
return static_cast<Int32>(static_cast<const DataTypeDateTime &>(*from_type_hint).getTimeZone().toDayNum(src.get<UInt64>()).toUnderType());
}
else if (which_type.isDateTime() && which_from_type.isDate())
{
return static_cast<const DataTypeDateTime &>(type).getTimeZone().fromDayNum(DayNum(src.get<UInt64>()));
}
else if (which_type.isDateTime() && which_from_type.isDate32())
{
return static_cast<const DataTypeDateTime &>(type).getTimeZone().fromDayNum(DayNum(src.get<Int32>()));
}
else if (type.isValueRepresentedByNumber() && src.getType() != Field::Types::String)
{
if (which_type.isUInt8()) return convertNumericType<UInt8>(src, type);

View File

@ -53,6 +53,7 @@ namespace DB
{arrow::Type::BOOL, "UInt8"},
{arrow::Type::DATE32, "Date"},
{arrow::Type::DATE32, "Date32"},
{arrow::Type::DATE64, "DateTime"},
{arrow::Type::TIMESTAMP, "DateTime"},
@ -145,9 +146,36 @@ namespace DB
}
/// Arrow stores Parquet::DATE in Int32, while ClickHouse stores Date in UInt16. Therefore, it should be checked before saving
static void fillColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
static void fillColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
{
PaddedPODArray<UInt16> & column_data = assert_cast<ColumnVector<UInt16> &>(internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
PaddedPODArray<UInt16> & column_data = assert_cast<ColumnVector<UInt16> &>(internal_column).getData();
arrow::Date32Array & chunk = static_cast<arrow::Date32Array &>(*(arrow_column->chunk(chunk_i)));
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
{
UInt32 days_num = static_cast<UInt32>(chunk.Value(value_i));
if (days_num > DATE_LUT_MAX_DAY_NUM)
{
// TODO: will it rollback correctly?
throw Exception
{
fmt::format("Input value {} of a column \"{}\" is greater than max allowed Date value, which is {}", days_num, internal_column.getName(), DATE_LUT_MAX_DAY_NUM),
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE
};
}
column_data.emplace_back(days_num);
}
}
}
static void fillDate32ColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, IColumn & internal_column)
{
PaddedPODArray<Int32> & column_data = assert_cast<ColumnVector<Int32> &>(internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
@ -156,8 +184,8 @@ namespace DB
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
{
UInt32 days_num = static_cast<UInt32>(chunk.Value(value_i));
if (days_num > DATE_LUT_MAX_DAY_NUM)
Int32 days_num = static_cast<Int32>(chunk.Value(value_i));
if (days_num > DATE_LUT_MAX_EXTEND_DAY_NUM)
{
// TODO: will it rollback correctly?
throw Exception
@ -328,7 +356,14 @@ namespace DB
fillColumnWithBooleanData(arrow_column, internal_column);
break;
case arrow::Type::DATE32:
fillColumnWithDate32Data(arrow_column, internal_column);
if (WhichDataType(internal_column.getDataType()).isUInt16())
{
fillColumnWithDate32Data(arrow_column, internal_column);
}
else
{
fillDate32ColumnWithDate32Data(arrow_column, internal_column);
}
break;
case arrow::Type::DATE64:
fillColumnWithDate64Data(arrow_column, internal_column);
@ -520,8 +555,19 @@ namespace DB
);
}
if (const auto * internal_type_it = std::find_if(arrow_type_to_internal_type.begin(), arrow_type_to_internal_type.end(),
[=](auto && elem) { return elem.first == arrow_type->id(); });
auto filter = [=](auto && elem)
{
auto which = WhichDataType(column_type);
if (arrow_type->id() == arrow::Type::DATE32 && which.isDateOrDate32())
{
return (strcmp(elem.second, "Date") == 0 && which.isDate()) || (strcmp(elem.second, "Date32") == 0 && which.isDate32());
}
else
{
return elem.first == arrow_type->id();
}
};
if (const auto * internal_type_it = std::find_if(arrow_type_to_internal_type.begin(), arrow_type_to_internal_type.end(), filter);
internal_type_it != arrow_type_to_internal_type.end())
{
return DataTypeFactory::instance().get(internal_type_it->second);

View File

@ -47,7 +47,7 @@ FillingTransform::FillingTransform(
DataTypePtr to_type;
/// TODO Wrong results for big integers.
if (isInteger(type) || which.isDate() || which.isDateTime())
if (isInteger(type) || which.isDate() || which.isDate32() || which.isDateTime())
{
max_type = Field::Types::Int64;
to_type = std::make_shared<DataTypeInt64>();

View File

@ -111,6 +111,7 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
context->getClientInfo().distributed_depth += 1;
random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
}
@ -156,9 +157,6 @@ void DistributedBlockOutputStream::write(const Block & block)
void DistributedBlockOutputStream::writeAsync(const Block & block)
{
const Settings & settings = context->getSettingsRef();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
if (random_shard_insert)
{
writeAsyncImpl(block, storage.getRandomShardIndex(cluster->getShardsInfo()));
@ -264,11 +262,19 @@ void DistributedBlockOutputStream::waitForJobs()
}
}
size_t jobs_count = remote_jobs_count + local_jobs_count;
size_t num_finished_jobs = finished_jobs_count;
if (random_shard_insert)
{
if (finished_jobs_count != 1)
LOG_WARNING(log, "Expected 1 writing jobs when doing random shard insert, but finished {}", num_finished_jobs);
}
else
{
size_t jobs_count = remote_jobs_count + local_jobs_count;
if (num_finished_jobs < jobs_count)
LOG_WARNING(log, "Expected {} writing jobs, but finished only {}", jobs_count, num_finished_jobs);
if (num_finished_jobs < jobs_count)
LOG_WARNING(log, "Expected {} writing jobs, but finished only {}", jobs_count, num_finished_jobs);
}
}
@ -401,7 +407,6 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
{
const Settings & settings = context->getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
size_t start = 0;
size_t end = shards_info.size();
@ -410,20 +415,13 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
start = settings.insert_shard_id - 1;
end = settings.insert_shard_id;
}
else if (random_shard_insert)
{
start = storage.getRandomShardIndex(shards_info);
end = start + 1;
}
size_t num_shards = end - start;
if (!pool)
{
/// Deferred initialization. Only for sync insertion.
initWritingJobs(block, start, end);
size_t jobs_count = remote_jobs_count + local_jobs_count;
size_t jobs_count = random_shard_insert ? 1 : (remote_jobs_count + local_jobs_count);
size_t max_threads = std::min<size_t>(settings.max_distributed_connections, jobs_count);
pool.emplace(/* max_threads_= */ max_threads,
/* max_free_threads_= */ max_threads,
@ -440,12 +438,20 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
watch_current_block.restart();
if (random_shard_insert)
{
start = storage.getRandomShardIndex(shards_info);
end = start + 1;
}
size_t num_shards = end - start;
if (num_shards > 1)
{
auto current_selector = createSelector(block);
/// Prepare row numbers for each shard
for (size_t shard_index : collections::range(0, num_shards))
/// Prepare row numbers for needed shards
for (size_t shard_index : collections::range(start, end))
per_shard_jobs[shard_index].shard_current_block_permutation.resize(0);
for (size_t i = 0; i < block.rows(); ++i)
@ -456,7 +462,7 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
{
/// Run jobs in parallel for each block and wait them
finished_jobs_count = 0;
for (size_t shard_index : collections::range(0, shards_info.size()))
for (size_t shard_index : collections::range(start, end))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->scheduleOrThrowOnError(runWritingJob(job, block, num_shards));
}

View File

@ -94,6 +94,7 @@ private:
size_t inserted_rows = 0;
bool insert_sync;
bool random_shard_insert;
bool allow_materialized;
/// Sync-related stuff

View File

@ -374,7 +374,7 @@ public:
void loadProjections(bool require_columns_checksums, bool check_consistency);
/// Return set of metadat file names without checksums. For example,
/// Return set of metadata file names without checksums. For example,
/// columns.txt or checksums.txt itself.
NameSet getFileNamesWithoutChecksums() const;

View File

@ -270,19 +270,17 @@ StoragePolicyPtr MergeTreeData::getStoragePolicy() const
static void checkKeyExpression(const ExpressionActions & expr, const Block & sample_block, const String & key_name, bool allow_nullable_key)
{
for (const auto & action : expr.getActions())
{
if (action.node->type == ActionsDAG::ActionType::ARRAY_JOIN)
throw Exception(key_name + " key cannot contain array joins", ErrorCodes::ILLEGAL_COLUMN);
if (expr.hasArrayJoin())
throw Exception(key_name + " key cannot contain array joins", ErrorCodes::ILLEGAL_COLUMN);
if (action.node->type == ActionsDAG::ActionType::FUNCTION)
{
IFunctionBase & func = *action.node->function_base;
if (!func.isDeterministic())
throw Exception(key_name + " key cannot contain non-deterministic functions, "
"but contains function " + func.getName(),
ErrorCodes::BAD_ARGUMENTS);
}
try
{
expr.assertDeterministic();
}
catch (Exception & e)
{
e.addMessage(fmt::format("for {} key", key_name));
throw;
}
for (const ColumnWithTypeAndName & element : sample_block)
@ -418,7 +416,6 @@ void MergeTreeData::checkProperties(
}
checkKeyExpression(*new_sorting_key.expression, new_sorting_key.sample_block, "Sorting", allow_nullable_key);
}
void MergeTreeData::setProperties(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata, bool attach)
@ -1515,6 +1512,7 @@ void checkVersionColumnTypesConversion(const IDataType * old_type, const IDataTy
if ((which_old_type.isInt() && !which_new_type.isInt())
|| (which_old_type.isUInt() && !which_new_type.isUInt())
|| (which_old_type.isDate() && !which_new_type.isDate())
|| (which_old_type.isDate32() && !which_new_type.isDate32())
|| (which_old_type.isDateTime() && !which_new_type.isDateTime())
|| (which_old_type.isFloat() && !which_new_type.isFloat()))
{
@ -3335,20 +3333,25 @@ MergeTreeData::getAllDataPartsVector(MergeTreeData::DataPartStateVector * out_st
return res;
}
std::vector<DetachedPartInfo>
MergeTreeData::getDetachedParts() const
std::vector<DetachedPartInfo> MergeTreeData::getDetachedParts() const
{
std::vector<DetachedPartInfo> res;
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
{
for (auto it = disk->iterateDirectory(fs::path(path) / MergeTreeData::DETACHED_DIR_NAME); it->isValid(); it->next())
{
res.emplace_back();
auto & part = res.back();
String detached_path = fs::path(path) / MergeTreeData::DETACHED_DIR_NAME;
DetachedPartInfo::tryParseDetachedPartName(it->name(), part, format_version);
part.disk = disk->getName();
/// Note: we don't care about TOCTOU issue here.
if (disk->exists(detached_path))
{
for (auto it = disk->iterateDirectory(detached_path); it->isValid(); it->next())
{
res.emplace_back();
auto & part = res.back();
DetachedPartInfo::tryParseDetachedPartName(it->name(), part, format_version);
part.disk = disk->getName();
}
}
}
return res;
@ -3924,7 +3927,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info) const
{
const auto & settings = query_context->getSettingsRef();
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections)
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections || query_info.is_projection_query)
return false;
const auto & query_ptr = query_info.query;

View File

@ -178,7 +178,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
Pipe projection_pipe;
Pipe ordinary_pipe;
const auto & given_select = query_info.query->as<const ASTSelectQuery &>();
if (!projection_parts.empty())
{
LOG_DEBUG(log, "projection required columns: {}", fmt::join(query_info.projection->required_columns, ", "));
@ -226,22 +225,28 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
if (!normal_parts.empty())
{
auto storage_from_base_parts_of_projection = StorageFromMergeTreeDataPart::create(std::move(normal_parts));
auto ast = query_info.projection->desc->query_ast->clone();
auto & select = ast->as<ASTSelectQuery &>();
if (given_select.where())
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.where()->clone());
if (given_select.prewhere())
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.prewhere()->clone());
// After overriding the group by clause, we finish the possible aggregations directly
if (processed_stage >= QueryProcessingStage::Enum::WithMergeableState && given_select.groupBy())
select.setExpression(ASTSelectQuery::Expression::GROUP_BY, given_select.groupBy()->clone());
auto interpreter = InterpreterSelectQuery(
ast,
query_info.query,
context,
storage_from_base_parts_of_projection,
nullptr,
SelectQueryOptions{processed_stage}.ignoreAggregation().ignoreProjections());
SelectQueryOptions{processed_stage}.projectionQuery());
QueryPlan ordinary_query_plan;
interpreter.buildQueryPlan(ordinary_query_plan);
const auto & expressions = interpreter.getAnalysisResult();
if (processed_stage == QueryProcessingStage::Enum::FetchColumns && expressions.before_where)
{
auto where_step = std::make_unique<FilterStep>(
ordinary_query_plan.getCurrentDataStream(),
expressions.before_where,
expressions.where_column_name,
expressions.remove_where_filter);
where_step->setStepDescription("WHERE");
ordinary_query_plan.addStep(std::move(where_step));
}
ordinary_pipe = QueryPipeline::getPipe(interpreter.execute().pipeline);
}

View File

@ -47,6 +47,7 @@ public:
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
}
bool supportsPrewhere() const override { return true; }
bool supportsIndexForIn() const override { return true; }

View File

@ -156,6 +156,7 @@ struct SelectQueryInfo
/// If not null, it means we choose a projection to execute current query.
std::optional<ProjectionCandidate> projection;
bool ignore_projections = false;
bool is_projection_query = false;
};
}

View File

@ -1093,7 +1093,7 @@ ClusterPtr StorageDistributed::skipUnusedShards(
size_t limit = local_context->getSettingsRef().optimize_skip_unused_shards_limit;
if (!limit || limit > SSIZE_MAX)
{
throw Exception("optimize_skip_unused_shards_limit out of range (0, {}]", ErrorCodes::ARGUMENT_OUT_OF_BOUND, SSIZE_MAX);
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "optimize_skip_unused_shards_limit out of range (0, {}]", SSIZE_MAX);
}
// To interpret limit==0 as limit is reached
++limit;

View File

@ -28,6 +28,7 @@
#include <Processors/Pipe.h>
#include <cassert>
#include <chrono>
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
@ -719,6 +720,34 @@ CheckResults StorageLog::checkData(const ASTPtr & /* query */, ContextPtr contex
}
IStorage::ColumnSizeByName StorageLog::getColumnSizes() const
{
std::shared_lock lock(rwlock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC));
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
ColumnSizeByName column_sizes;
FileChecker::Map file_sizes = file_checker.getFileSizes();
for (const auto & column : getInMemoryMetadata().getColumns().getAllPhysical())
{
ISerialization::StreamCallback stream_callback = [&, this] (const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream(column, substream_path);
ColumnSize & size = column_sizes[column.name];
auto it = files.find(stream_name);
if (it != files.end())
size.data_compressed += file_sizes[fileName(it->second.data_file_path)];
};
ISerialization::SubstreamPath substream_path;
auto serialization = column.type->getDefaultSerialization();
serialization->enumerateStreams(stream_callback, substream_path);
}
return column_sizes;
}
void registerStorageLog(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{

View File

@ -45,6 +45,7 @@ public:
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
bool supportsSubcolumns() const override { return true; }
ColumnSizeByName getColumnSizes() const override;
protected:
/** Attach the table with the appropriate name, along the appropriate path (with / at the end),
@ -87,7 +88,7 @@ private:
DiskPtr disk;
String table_path;
std::shared_timed_mutex rwlock;
mutable std::shared_timed_mutex rwlock;
Files files;

View File

@ -0,0 +1,177 @@
#include "StorageSQLite.h"
#if USE_SQLITE
#include <common/range.h>
#include <DataStreams/SQLiteBlockInputStream.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>
#include <IO/Operators.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Storages/StorageFactory.h>
#include <Storages/transformQueryForExternalDatabase.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SQLITE_ENGINE_ERROR;
}
StorageSQLite::StorageSQLite(
const StorageID & table_id_,
SQLitePtr sqlite_db_,
const String & remote_table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, remote_table_name(remote_table_name_)
, global_context(context_)
, sqlite_db(sqlite_db_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
}
Pipe StorageSQLite::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
ContextPtr context_,
QueryProcessingStage::Enum,
size_t max_block_size,
unsigned int)
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
String query = transformQueryForExternalDatabase(
query_info,
metadata_snapshot->getColumns().getOrdinary(),
IdentifierQuotingStyle::DoubleQuotes,
"",
remote_table_name,
context_);
Block sample_block;
for (const String & column_name : column_names)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(column_name);
sample_block.insert({column_data.type, column_data.name});
}
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<SQLiteBlockInputStream>(sqlite_db, query, sample_block, max_block_size)));
}
class SQLiteBlockOutputStream : public IBlockOutputStream
{
public:
explicit SQLiteBlockOutputStream(
const StorageSQLite & storage_,
const StorageMetadataPtr & metadata_snapshot_,
StorageSQLite::SQLitePtr sqlite_db_,
const String & remote_table_name_)
: storage{storage_}
, metadata_snapshot(metadata_snapshot_)
, sqlite_db(sqlite_db_)
, remote_table_name(remote_table_name_)
{
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
WriteBufferFromOwnString sqlbuf;
sqlbuf << "INSERT INTO ";
sqlbuf << doubleQuoteString(remote_table_name);
sqlbuf << " (";
for (auto it = block.begin(); it != block.end(); ++it)
{
if (it != block.begin())
sqlbuf << ", ";
sqlbuf << quoteString(it->name);
}
sqlbuf << ") VALUES ";
auto writer = FormatFactory::instance().getOutputStream("Values", sqlbuf, metadata_snapshot->getSampleBlock(), storage.getContext());
writer->write(block);
sqlbuf << ";";
char * err_message = nullptr;
int status = sqlite3_exec(sqlite_db.get(), sqlbuf.str().c_str(), nullptr, nullptr, &err_message);
if (status != SQLITE_OK)
{
String err_msg(err_message);
sqlite3_free(err_message);
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Failed to execute sqlite INSERT query. Status: {}. Message: {}",
status, err_msg);
}
}
private:
const StorageSQLite & storage;
StorageMetadataPtr metadata_snapshot;
StorageSQLite::SQLitePtr sqlite_db;
String remote_table_name;
};
BlockOutputStreamPtr StorageSQLite::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr)
{
return std::make_shared<SQLiteBlockOutputStream>(*this, metadata_snapshot, sqlite_db, remote_table_name);
}
void registerStorageSQLite(StorageFactory & factory)
{
factory.registerStorage("SQLite", [](const StorageFactory::Arguments & args) -> StoragePtr
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 2)
throw Exception("SQLite database requires 2 arguments: database path, table name",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext());
const auto database_path = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
const auto table_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
sqlite3 * tmp_sqlite_db = nullptr;
int status = sqlite3_open(database_path.c_str(), &tmp_sqlite_db);
if (status != SQLITE_OK)
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Failed to open sqlite database. Status: {}. Message: {}",
status, sqlite3_errstr(status));
return StorageSQLite::create(args.table_id, std::shared_ptr<sqlite3>(tmp_sqlite_db, sqlite3_close),
table_name, args.columns, args.constraints, args.getContext());
},
{
.source_access_type = AccessType::SQLITE,
});
}
}
#endif

View File

@ -0,0 +1,53 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_SQLITE
#include <common/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <sqlite3.h>
namespace DB
{
class StorageSQLite final : public shared_ptr_helper<StorageSQLite>, public IStorage, public WithContext
{
friend struct shared_ptr_helper<StorageSQLite>;
public:
using SQLitePtr = std::shared_ptr<sqlite3>;
StorageSQLite(
const StorageID & table_id_,
SQLitePtr sqlite_db_,
const String & remote_table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_);
std::string getName() const override { return "SQLite"; }
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
private:
String remote_table_name;
ContextPtr global_context;
SQLitePtr sqlite_db;
};
}
#endif

View File

@ -4,6 +4,7 @@
#include <map>
#include <cassert>
#include <chrono>
#include <Poco/Util/XMLConfiguration.h>
@ -523,6 +524,34 @@ CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, ContextPtr co
return file_checker.check();
}
IStorage::ColumnSizeByName StorageTinyLog::getColumnSizes() const
{
std::shared_lock lock(rwlock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC));
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
ColumnSizeByName column_sizes;
FileChecker::Map file_sizes = file_checker.getFileSizes();
for (const auto & column : getInMemoryMetadata().getColumns().getAllPhysical())
{
ISerialization::StreamCallback stream_callback = [&, this] (const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream(column, substream_path);
ColumnSize & size = column_sizes[column.name];
auto it = files.find(stream_name);
if (it != files.end())
size.data_compressed += file_sizes[fileName(it->second.data_file_path)];
};
ISerialization::SubstreamPath substream_path;
auto serialization = column.type->getDefaultSerialization();
serialization->enumerateStreams(stream_callback, substream_path);
}
return column_sizes;
}
void StorageTinyLog::truncate(
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &)
{

View File

@ -45,6 +45,7 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
ColumnSizeByName getColumnSizes() const override;
protected:
StorageTinyLog(
DiskPtr disk_,
@ -71,7 +72,7 @@ private:
Files files;
FileChecker file_checker;
std::shared_timed_mutex rwlock;
mutable std::shared_timed_mutex rwlock;
Poco::Logger * log;

View File

@ -98,7 +98,7 @@ protected:
Names cols_required_for_sorting_key;
Names cols_required_for_primary_key;
Names cols_required_for_sampling;
MergeTreeData::ColumnSizeByName column_sizes;
IStorage::ColumnSizeByName column_sizes;
{
StoragePtr storage = storages.at(std::make_pair(database_name, table_name));

View File

@ -0,0 +1,21 @@
#include <Interpreters/Context.h>
#include <Storages/System/StorageSystemWarnings.h>
namespace DB
{
NamesAndTypesList StorageSystemWarnings::getNamesAndTypes()
{
return {
{"message", std::make_shared<DataTypeString>()},
};
}
void StorageSystemWarnings::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
for (const auto & warning : context->getWarnings())
res_columns[0]->insert(warning);
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class Context;
/** Implements system.warnings table that contains warnings about server configuration
* to be displayed in clickhouse-client.
*/
class StorageSystemWarnings final : public shared_ptr_helper<StorageSystemWarnings>,
public IStorageSystemOneBlock<StorageSystemWarnings> {
public:
std::string getName() const override { return "SystemWarnings"; }
static NamesAndTypesList getNamesAndTypes();
protected:
friend struct shared_ptr_helper<StorageSystemWarnings>;
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const override;
};
}

View File

@ -43,6 +43,7 @@
#include <Storages/System/StorageSystemZooKeeper.h>
#include <Storages/System/StorageSystemContributors.h>
#include <Storages/System/StorageSystemErrors.h>
#include <Storages/System/StorageSystemWarnings.h>
#include <Storages/System/StorageSystemDDLWorkerQueue.h>
#if !defined(ARCADIA_BUILD)
@ -116,6 +117,7 @@ void attachSystemTablesLocal(IDatabase & system_database)
attach<StorageSystemUserDirectories>(system_database, "user_directories");
attach<StorageSystemPrivileges>(system_database, "privileges");
attach<StorageSystemErrors>(system_database, "errors");
attach<StorageSystemWarnings>(system_database, "warnings");
attach<StorageSystemDataSkippingIndices>(system_database, "data_skipping_indices");
#if !defined(ARCADIA_BUILD)
attach<StorageSystemLicenses>(system_database, "licenses");

View File

@ -67,6 +67,11 @@ void registerStorageMaterializedPostgreSQL(StorageFactory & factory);
void registerStorageExternalDistributed(StorageFactory & factory);
#endif
#if USE_SQLITE
void registerStorageSQLite(StorageFactory & factory);
#endif
void registerStorages()
{
auto & factory = StorageFactory::instance();
@ -128,6 +133,10 @@ void registerStorages()
#if USE_MYSQL || USE_LIBPQXX
registerStorageExternalDistributed(factory);
#endif
#if USE_SQLITE
registerStorageSQLite(factory);
#endif
}
}

View File

@ -118,6 +118,7 @@ SRCS(
MySQL/MySQLSettings.cpp
PartitionCommands.cpp
ProjectionsDescription.cpp
ReadFinalForExternalReplicaStorage.cpp
ReadInOrderOptimizer.cpp
SelectQueryDescription.cpp
SetSettings.cpp
@ -142,6 +143,7 @@ SRCS(
StorageMySQL.cpp
StorageNull.cpp
StorageReplicatedMergeTree.cpp
StorageSQLite.cpp
StorageSet.cpp
StorageStripeLog.cpp
StorageTinyLog.cpp

Some files were not shown because too many files have changed in this diff Show More