Merge branch 'master' into row-binary-with-defaults

This commit is contained in:
Kruglov Pavel 2023-07-18 13:36:56 +02:00 committed by GitHub
commit 6985bf0cdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 430 additions and 119 deletions

View File

@ -1,43 +1,38 @@
# Usage:
# set (MAX_COMPILER_MEMORY 2000 CACHE INTERNAL "") # In megabytes
# set (MAX_LINKER_MEMORY 3500 CACHE INTERNAL "")
# include (cmake/limit_jobs.cmake)
# Limit compiler/linker job concurrency to avoid OOMs on subtrees where compilation/linking is memory-intensive.
#
# Usage from CMake:
# set (MAX_COMPILER_MEMORY 2000 CACHE INTERNAL "") # megabyte
# set (MAX_LINKER_MEMORY 3500 CACHE INTERNAL "") # megabyte
# include (cmake/limit_jobs.cmake)
#
# (bigger values mean fewer jobs)
cmake_host_system_information(RESULT TOTAL_PHYSICAL_MEMORY QUERY TOTAL_PHYSICAL_MEMORY) # Not available under freebsd
cmake_host_system_information(RESULT TOTAL_PHYSICAL_MEMORY QUERY TOTAL_PHYSICAL_MEMORY)
cmake_host_system_information(RESULT NUMBER_OF_LOGICAL_CORES QUERY NUMBER_OF_LOGICAL_CORES)
# 1 if not set
option(PARALLEL_COMPILE_JOBS "Maximum number of concurrent compilation jobs" "")
# Set to disable the automatic job-limiting
option(PARALLEL_COMPILE_JOBS "Maximum number of concurrent compilation jobs" OFF)
option(PARALLEL_LINK_JOBS "Maximum number of concurrent link jobs" OFF)
# 1 if not set
option(PARALLEL_LINK_JOBS "Maximum number of concurrent link jobs" "")
if (NOT PARALLEL_COMPILE_JOBS AND TOTAL_PHYSICAL_MEMORY AND MAX_COMPILER_MEMORY)
if (NOT PARALLEL_COMPILE_JOBS AND MAX_COMPILER_MEMORY)
math(EXPR PARALLEL_COMPILE_JOBS ${TOTAL_PHYSICAL_MEMORY}/${MAX_COMPILER_MEMORY})
if (NOT PARALLEL_COMPILE_JOBS)
set (PARALLEL_COMPILE_JOBS 1)
endif ()
if (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set (PARALLEL_COMPILE_JOBS_LESS TRUE)
if (PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES)
message(WARNING "The auto-calculated compile jobs limit (${PARALLEL_COMPILE_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_COMPILE_JOBS to override.")
endif()
endif ()
if (PARALLEL_COMPILE_JOBS AND (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES))
set(CMAKE_JOB_POOL_COMPILE compile_job_pool${CMAKE_CURRENT_SOURCE_DIR})
string (REGEX REPLACE "[^a-zA-Z0-9]+" "_" CMAKE_JOB_POOL_COMPILE ${CMAKE_JOB_POOL_COMPILE})
set_property(GLOBAL APPEND PROPERTY JOB_POOLS ${CMAKE_JOB_POOL_COMPILE}=${PARALLEL_COMPILE_JOBS})
endif ()
if (NOT PARALLEL_LINK_JOBS AND TOTAL_PHYSICAL_MEMORY AND MAX_LINKER_MEMORY)
if (NOT PARALLEL_LINK_JOBS AND MAX_LINKER_MEMORY)
math(EXPR PARALLEL_LINK_JOBS ${TOTAL_PHYSICAL_MEMORY}/${MAX_LINKER_MEMORY})
if (NOT PARALLEL_LINK_JOBS)
set (PARALLEL_LINK_JOBS 1)
endif ()
if (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_LINK_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set (PARALLEL_LINK_JOBS_LESS TRUE)
if (PARALLEL_LINK_JOBS LESS NUMBER_OF_LOGICAL_CORES)
message(WARNING "The auto-calculated link jobs limit (${PARALLEL_LINK_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_LINK_JOBS to override.")
endif()
endif ()
@ -52,20 +47,16 @@ if (CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO" AND ENABLE_THINLTO AND PARALLE
set (PARALLEL_LINK_JOBS 2)
endif()
if (PARALLEL_LINK_JOBS AND (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_LINK_JOBS LESS NUMBER_OF_LOGICAL_CORES))
message(STATUS "Building sub-tree with ${PARALLEL_COMPILE_JOBS} compile jobs and ${PARALLEL_LINK_JOBS} linker jobs (system: ${NUMBER_OF_LOGICAL_CORES} cores, ${TOTAL_PHYSICAL_MEMORY} MB DRAM, 'OFF' means the native core count).")
if (PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set(CMAKE_JOB_POOL_COMPILE compile_job_pool${CMAKE_CURRENT_SOURCE_DIR})
string (REGEX REPLACE "[^a-zA-Z0-9]+" "_" CMAKE_JOB_POOL_COMPILE ${CMAKE_JOB_POOL_COMPILE})
set_property(GLOBAL APPEND PROPERTY JOB_POOLS ${CMAKE_JOB_POOL_COMPILE}=${PARALLEL_COMPILE_JOBS})
endif ()
if (PARALLEL_LINK_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set(CMAKE_JOB_POOL_LINK link_job_pool${CMAKE_CURRENT_SOURCE_DIR})
string (REGEX REPLACE "[^a-zA-Z0-9]+" "_" CMAKE_JOB_POOL_LINK ${CMAKE_JOB_POOL_LINK})
set_property(GLOBAL APPEND PROPERTY JOB_POOLS ${CMAKE_JOB_POOL_LINK}=${PARALLEL_LINK_JOBS})
endif ()
if (PARALLEL_COMPILE_JOBS OR PARALLEL_LINK_JOBS)
message(STATUS
"${CMAKE_CURRENT_SOURCE_DIR}: Have ${TOTAL_PHYSICAL_MEMORY} megabytes of memory.
Limiting concurrent linkers jobs to ${PARALLEL_LINK_JOBS} and compiler jobs to ${PARALLEL_COMPILE_JOBS} (system has ${NUMBER_OF_LOGICAL_CORES} logical cores)")
if (PARALLEL_COMPILE_JOBS_LESS)
message(WARNING "The autocalculated compile jobs limit (${PARALLEL_COMPILE_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_COMPILE_JOBS to override.")
endif()
if (PARALLEL_LINK_JOBS_LESS)
message(WARNING "The autocalculated link jobs limit (${PARALLEL_LINK_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_LINK_JOBS to override.")
endif()
endif ()

2
contrib/cctz vendored

@ -1 +1 @@
Subproject commit 5e05432420f9692418e2e12aff09859e420b14a2
Subproject commit 8529bcef5cd996b7c0f4d7475286b76b5d126c4c

View File

@ -0,0 +1,32 @@
---
slug: /en/sql-reference/aggregate-functions/reference/array_concat_agg
sidebar_position: 110
---
# array_concat_agg
- Alias of `groupArrayArray`. The function is case insensitive.
**Example**
```text
SELECT *
FROM t
┌─a───────┐
│ [1,2,3] │
│ [4,5] │
│ [6] │
└─────────┘
```
Query:
```sql
SELECT array_concat_agg(a) AS a
FROM t
┌─a─────────────┐
│ [1,2,3,4,5,6] │
└───────────────┘
```

View File

@ -222,7 +222,6 @@ AggregateFunctionPtr AggregateFunctionFactory::tryGet(
: nullptr;
}
std::optional<AggregateFunctionProperties> AggregateFunctionFactory::tryGetProperties(String name) const
{
if (name.size() > MAX_AGGREGATE_FUNCTION_NAME_LENGTH)

View File

@ -126,6 +126,7 @@ void registerAggregateFunctionGroupArray(AggregateFunctionFactory & factory)
factory.registerFunction("groupArray", { createAggregateFunctionGroupArray<false>, properties });
factory.registerAlias("array_agg", "groupArray", AggregateFunctionFactory::CaseInsensitive);
factory.registerAliasUnchecked("array_concat_agg", "groupArrayArray", AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("groupArraySample", { createAggregateFunctionGroupArraySample, properties });
factory.registerFunction("groupArrayLast", { createAggregateFunctionGroupArray<true>, properties });
}

View File

@ -52,35 +52,38 @@ public:
{
const auto & creator_map = getMap();
const auto & case_insensitive_creator_map = getCaseInsensitiveMap();
const String factory_name = getFactoryName();
String real_dict_name;
if (creator_map.count(real_name))
real_dict_name = real_name;
else if (auto real_name_lowercase = Poco::toLower(real_name); case_insensitive_creator_map.count(real_name_lowercase))
real_dict_name = real_name_lowercase;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: can't create alias '{}', the real name '{}' is not registered",
factory_name, alias_name, real_name);
auto real_name_lowercase = Poco::toLower(real_name);
if (!creator_map.contains(real_name) && !case_insensitive_creator_map.contains(real_name_lowercase))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}: can't create alias '{}', the real name '{}' is not registered",
getFactoryName(),
alias_name,
real_name);
registerAliasUnchecked(alias_name, real_name, case_sensitiveness);
}
/// We need sure the real_name exactly exists when call the function directly.
void registerAliasUnchecked(const String & alias_name, const String & real_name, CaseSensitiveness case_sensitiveness = CaseSensitive)
{
String alias_name_lowercase = Poco::toLower(alias_name);
if (creator_map.count(alias_name) || case_insensitive_creator_map.count(alias_name_lowercase))
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: the alias name '{}' is already registered as real name",
factory_name, alias_name);
String real_name_lowercase = Poco::toLower(real_name);
const String factory_name = getFactoryName();
if (case_sensitiveness == CaseInsensitive)
{
if (!case_insensitive_aliases.emplace(alias_name_lowercase, real_dict_name).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: case insensitive alias name '{}' is not unique",
factory_name, alias_name);
if (!case_insensitive_aliases.emplace(alias_name_lowercase, real_name).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: case insensitive alias name '{}' is not unique", factory_name, alias_name);
case_insensitive_name_mapping[alias_name_lowercase] = real_name;
}
if (!aliases.emplace(alias_name, real_dict_name).second)
if (!aliases.emplace(alias_name, real_name).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: alias name '{}' is not unique", factory_name, alias_name);
}
std::vector<String> getAllRegisteredNames() const override
{
std::vector<String> result;
@ -93,7 +96,7 @@ public:
bool isCaseInsensitive(const String & name) const
{
String name_lowercase = Poco::toLower(name);
return getCaseInsensitiveMap().count(name_lowercase) || case_insensitive_aliases.count(name_lowercase);
return getCaseInsensitiveMap().contains(name_lowercase) || case_insensitive_aliases.contains(name_lowercase);
}
const String & aliasTo(const String & name) const
@ -106,14 +109,11 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: name '{}' is not alias", getFactoryName(), name);
}
bool isAlias(const String & name) const
{
return aliases.count(name) || case_insensitive_aliases.contains(name);
}
bool isAlias(const String & name) const { return aliases.contains(name) || case_insensitive_aliases.contains(name); }
bool hasNameOrAlias(const String & name) const
{
return getMap().count(name) || getCaseInsensitiveMap().count(name) || isAlias(name);
return getMap().contains(name) || getCaseInsensitiveMap().contains(name) || isAlias(name);
}
/// Return the canonical name (the name used in registration) if it's different from `name`.
@ -129,7 +129,7 @@ public:
private:
using InnerMap = std::unordered_map<String, Value>; // name -> creator
using AliasMap = std::unordered_map<String, String>; // alias -> original type
using AliasMap = std::unordered_map<String, String>; // alias -> original name
virtual const InnerMap & getMap() const = 0;
virtual const InnerMap & getCaseInsensitiveMap() const = 0;

View File

@ -25,8 +25,6 @@ void Pool::Entry::incrementRefCount()
/// First reference, initialize thread
if (data->ref_count.fetch_add(1) == 0)
mysql_thread_init();
chassert(!data->removed_from_pool);
}
@ -43,7 +41,10 @@ void Pool::Entry::decrementRefCount()
/// In Pool::Entry::disconnect() we remove connection from the list of pool's connections.
/// So now we must deallocate the memory.
if (data->removed_from_pool)
{
data->conn.disconnect();
::delete data;
}
}
}
@ -230,8 +231,6 @@ void Pool::removeConnection(Connection* connection)
std::lock_guard lock(mutex);
if (connection)
{
if (!connection->removed_from_pool)
connection->conn.disconnect();
connections.remove(connection);
connection->removed_from_pool = true;
}
@ -240,6 +239,7 @@ void Pool::removeConnection(Connection* connection)
void Pool::Entry::disconnect()
{
// Remove the Entry from the Pool. Actual disconnection is delayed until refcount == 0.
pool->removeConnection(data);
}

View File

@ -4,6 +4,7 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Common/Exception.h>
#include <DataTypes/IDataType.h>
namespace DB
{
@ -28,6 +29,20 @@ const DB::DataStream & getChildOutputStream(DB::QueryPlan::Node & node)
namespace DB::QueryPlanOptimizations
{
/// This is a check that output columns does not have the same name
/// This is ok for DAG, but may introduce a bug in a SotringStep cause columns are selected by name.
static bool areOutputsConvertableToBlock(const ActionsDAG::NodeRawConstPtrs & outputs)
{
std::unordered_set<std::string_view> names;
for (const auto & output : outputs)
{
if (!names.emplace(output->result_name).second)
return false;
}
return true;
}
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes)
{
if (parent_node->children.size() != 1)
@ -57,6 +72,9 @@ size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan:
if (unneeded_for_sorting->trivial())
return 0;
if (!areOutputsConvertableToBlock(needed_for_sorting->getOutputs()))
return 0;
// Sorting (parent_node) -> Expression (child_node)
auto & node_with_needed = nodes.emplace_back();
std::swap(node_with_needed.children, child_node->children);

View File

@ -564,7 +564,17 @@ static const ActionsDAG::Node & cloneASTWithInversionPushDown(
}
case (ActionsDAG::ActionType::COLUMN):
{
res = &inverted_dag.addColumn({node.column, node.result_type, node.result_name});
String name;
if (const auto * column_const = typeid_cast<const ColumnConst *>(node.column.get()))
/// Re-generate column name for constant.
/// DAG form query (with enabled analyzer) uses suffixes for constants, like 1_UInt8.
/// DAG from PK does not use it. This is breakig match by column name sometimes.
/// Ideally, we should not compare manes, but DAG subtrees instead.
name = ASTLiteral(column_const->getDataColumn()[0]).getColumnName();
else
name = node.result_name;
res = &inverted_dag.addColumn({node.column, node.result_type, name});
break;
}
case (ActionsDAG::ActionType::ALIAS):

View File

@ -1,4 +1,6 @@
-- Tags: long
-- Tags: long, no-upgrade-check
-- TODO(@vdimir): remove no-upgrade-check tag after https://github.com/ClickHouse/ClickHouse/pull/51737 is released
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;

View File

@ -0,0 +1,133 @@
drop table if exists test;
drop table if exists test1;
CREATE TABLE test
(
`pt` String,
`count_distinct_exposure_uv` AggregateFunction(uniqHLL12, Int64)
)
ENGINE = AggregatingMergeTree
ORDER BY pt;
SELECT *
FROM
(
SELECT m0.pt AS pt
,m0.`exposure_uv` AS exposure_uv
,round(m2.exposure_uv,4) AS exposure_uv_hb_last_value
,if(m2.exposure_uv IS NULL OR m2.exposure_uv = 0,NULL,round((m0.exposure_uv - m2.exposure_uv) * 1.0 / m2.exposure_uv,4)) AS exposure_uv_hb_diff_percent
,round(m1.exposure_uv,4) AS exposure_uv_tb_last_value
,if(m1.exposure_uv IS NULL OR m1.exposure_uv = 0,NULL,round((m0.exposure_uv - m1.exposure_uv) * 1.0 / m1.exposure_uv,4)) AS exposure_uv_tb_diff_percent
FROM
(
SELECT m0.pt AS pt
,`exposure_uv` AS `exposure_uv`
FROM
(
SELECT pt AS pt
,CASE WHEN COUNT(`exposure_uv`) > 0 THEN AVG(`exposure_uv`) ELSE 0 END AS `exposure_uv`
FROM
(
SELECT pt AS pt
,uniqHLL12Merge(count_distinct_exposure_uv) AS `exposure_uv`
FROM test
GROUP BY pt
) m
GROUP BY pt
) m0
) m0
LEFT JOIN
(
SELECT m0.pt AS pt
,`exposure_uv` AS `exposure_uv`
FROM
(
SELECT formatDateTime(addYears(parseDateTimeBestEffort(pt),1),'%Y%m%d') AS pt
,CASE WHEN COUNT(`exposure_uv`) > 0 THEN AVG(`exposure_uv`) ELSE 0 END AS `exposure_uv`
FROM
(
SELECT pt AS pt
,uniqHLL12Merge(count_distinct_exposure_uv) AS `exposure_uv`
FROM test
GROUP BY pt
) m
GROUP BY pt
) m0
) m1
ON m0.pt = m1.pt
LEFT JOIN
(
SELECT m0.pt AS pt
,`exposure_uv` AS `exposure_uv`
FROM
(
SELECT formatDateTime(addDays(toDate(parseDateTimeBestEffort(pt)),1),'%Y%m%d') AS pt
,CASE WHEN COUNT(`exposure_uv`) > 0 THEN AVG(`exposure_uv`) ELSE 0 END AS `exposure_uv`
FROM
(
SELECT pt AS pt
,uniqHLL12Merge(count_distinct_exposure_uv) AS `exposure_uv`
FROM test
GROUP BY pt
) m
GROUP BY pt
) m0
) m2
ON m0.pt = m2.pt
) c0
ORDER BY pt ASC, exposure_uv DESC
settings join_use_nulls = 1;
CREATE TABLE test1
(
`pt` String,
`exposure_uv` Float64
)
ENGINE = Memory;
SELECT *
FROM
(
SELECT m0.pt
,m0.exposure_uv AS exposure_uv
,round(m2.exposure_uv,4)
FROM
(
SELECT pt
,exposure_uv
FROM test1
) m0
LEFT JOIN
(
SELECT pt
,exposure_uv
FROM test1
) m1
ON m0.pt = m1.pt
LEFT JOIN
(
SELECT pt
,exposure_uv
FROM test1
) m2
ON m0.pt = m2.pt
) c0
ORDER BY exposure_uv
settings join_use_nulls = 1;
SELECT
pt AS pt,
exposure_uv AS exposure_uv
FROM
(
SELECT
pt
FROM test1
) AS m0
FULL OUTER JOIN
(
SELECT
pt,
exposure_uv
FROM test1
) AS m1 ON m0.pt = m1.pt;

View File

@ -0,0 +1,3 @@
20230626 0.3156979034107179 \N \N
20230626 0.2624629016490004 \N \N
20230626 0.19390556368960468 \N \N

View File

@ -0,0 +1,107 @@
create table test1 (
`pt` String,
`brand_name` String,
`total_indirect_order_cnt` Float64,
`total_indirect_gmv` Float64
) ENGINE = Memory;
create table test2 (
`pt` String,
`brand_name` String,
`exposure_uv` Float64,
`click_uv` Float64
) ENGINE = Memory;
INSERT INTO test1 (`pt`, `brand_name`, `total_indirect_order_cnt`, `total_indirect_gmv`) VALUES ('20230625', 'LINING', 2232, 1008710), ('20230625', 'adidas', 125, 58820), ('20230625', 'Nike', 1291, 1033020), ('20230626', 'Nike', 1145, 938926), ('20230626', 'LINING', 1904, 853336), ('20230626', 'adidas', 133, 62546), ('20220626', 'LINING', 3747, 1855203), ('20220626', 'Nike', 2295, 1742665), ('20220626', 'adidas', 302, 122388);
INSERT INTO test2 (`pt`, `brand_name`, `exposure_uv`, `click_uv`) VALUES ('20230625', 'Nike', 2012913, 612831), ('20230625', 'adidas', 480277, 96176), ('20230625', 'LINING', 2474234, 627814), ('20230626', 'Nike', 1934666, 610770), ('20230626', 'adidas', 469904, 91117), ('20230626', 'LINING', 2285142, 599765), ('20220626', 'Nike', 2979656, 937166), ('20220626', 'adidas', 704751, 124250), ('20220626', 'LINING', 3163884, 1010221);
SELECT * FROM (
SELECT m0.pt AS pt
,m0.`uvctr` AS uvctr
,round(m1.uvctr,4) AS uvctr_hb_last_value
,round(m2.uvctr,4) AS uvctr_tb_last_value
FROM
(
SELECT m0.pt AS pt
,COALESCE(m0.brand_name,m1.brand_name) AS brand_name
,if(isNaN(`click_uv` / `exposure_uv`) OR isInfinite(`click_uv` / `exposure_uv`),NULL,`click_uv` / `exposure_uv`) AS `uvctr`
FROM
(
SELECT pt AS pt
,brand_name AS `brand_name`
,exposure_uv AS `exposure_uv`
,click_uv AS `click_uv`
FROM test2
WHERE pt = '20230626'
) m0
FULL JOIN
(
SELECT pt AS pt
,brand_name AS `brand_name`
,total_indirect_order_cnt AS `total_indirect_order_cnt`
,total_indirect_gmv AS `total_indirect_gmv`
FROM test1
WHERE pt = '20230626'
) m1
ON m0.brand_name = m1.brand_name AND m0.pt = m1.pt
) m0
LEFT JOIN
(
SELECT m0.pt AS pt
,if(isNaN(`click_uv` / `exposure_uv`) OR isInfinite(`click_uv` / `exposure_uv`),NULL,`click_uv` / `exposure_uv`) AS `uvctr`
,COALESCE(m0.brand_name,m1.brand_name) AS brand_name
,`exposure_uv` AS `exposure_uv`
,`click_uv`
FROM
(
SELECT pt AS pt
,brand_name AS `brand_name`
,exposure_uv AS `exposure_uv`
,click_uv AS `click_uv`
FROM test2
WHERE pt = '20230625'
) m0
FULL JOIN
(
SELECT pt AS pt
,brand_name AS `brand_name`
,total_indirect_order_cnt AS `total_indirect_order_cnt`
,total_indirect_gmv AS `total_indirect_gmv`
FROM test1
WHERE pt = '20230625'
) m1
ON m0.brand_name = m1.brand_name AND m0.pt = m1.pt
) m1
ON m0.brand_name = m1.brand_name AND m0.pt = m1.pt
LEFT JOIN
(
SELECT m0.pt AS pt
,if(isNaN(`click_uv` / `exposure_uv`) OR isInfinite(`click_uv` / `exposure_uv`),NULL,`click_uv` / `exposure_uv`) AS `uvctr`
,COALESCE(m0.brand_name,m1.brand_name) AS brand_name
,`exposure_uv` AS `exposure_uv`
,`click_uv`
FROM
(
SELECT pt AS pt
,brand_name AS `brand_name`
,exposure_uv AS `exposure_uv`
,click_uv AS `click_uv`
FROM test2
WHERE pt = '20220626'
) m0
FULL JOIN
(
SELECT pt AS pt
,brand_name AS `brand_name`
,total_indirect_order_cnt AS `total_indirect_order_cnt`
,total_indirect_gmv AS `total_indirect_gmv`
FROM test1
WHERE pt = '20220626'
) m1
ON m0.brand_name = m1.brand_name AND m0.pt = m1.pt
) m2
ON m0.brand_name = m2.brand_name AND m0.pt = m2.pt
) c0
ORDER BY pt ASC, uvctr DESC;

View File

@ -0,0 +1,5 @@
[1,2,3,4,5,6]
[1,2,3,4,5,6]
1 [1,2,3]
2 [4,5]
3 [6]

View File

@ -0,0 +1,9 @@
drop table if exists t;
create table t (n UInt32, a Array(Int32)) engine=Memory;
insert into t values (1, [1,2,3]), (2, [4,5]), (3, [6]);
select array_concat_agg(a) from t;
select ArrAy_cOncAt_aGg(a) from t;
select n, array_concat_agg(a) from t group by n order by n;
drop table t;

View File

@ -992,6 +992,7 @@ addressToLine
addressToLineWithInlines
addressToSymbol
adviced
agg
aggregatefunction
aggregatingmergetree
aggregatio

View File

@ -362,11 +362,12 @@ int decompressFiles(int input_fd, char * path, char * name, bool & have_compress
#else
int read_exe_path(char *exe, size_t/* buf_sz*/)
int read_exe_path(char *exe, size_t buf_sz)
{
if (realpath("/proc/self/exe", exe) == nullptr)
return 1;
return 0;
ssize_t n = readlink("/proc/self/exe", exe, buf_sz - 1);
if (n > 0)
exe[n] = '\0';
return n > 0 && n < static_cast<ssize_t>(buf_sz);
}
#endif
@ -430,58 +431,55 @@ int main(int/* argc*/, char* argv[])
return 1;
}
int lock = -1;
/// Protection from double decompression
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
/// get inode of this executable
uint64_t inode = getInode(self);
/// In some cases /proc/self/maps may not contain the inode for the
/// /proc/self/exe, one of such examples are using qemu-*-static, in this
/// case maps will be proxied through the qemu, and it will remove
/// information about itself from it.
if (inode != 0)
if (inode == 0)
{
std::stringstream lock_path; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
lock_path << "/tmp/" << name << ".decompression." << inode << ".lock";
lock = open(lock_path.str().c_str(), O_CREAT | O_RDWR, 0666);
if (lock < 0)
std::cerr << "Unable to obtain inode for exe '" << self << "'." << std::endl;
return 1;
}
std::stringstream lock_path; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
lock_path << "/tmp/" << name << ".decompression." << inode << ".lock";
int lock = open(lock_path.str().c_str(), O_CREAT | O_RDWR, 0666);
if (lock < 0)
{
perror("lock open");
return 1;
}
/// lock file should be closed on exec call
fcntl(lock, F_SETFD, FD_CLOEXEC);
if (lockf(lock, F_LOCK, 0))
{
perror("lockf");
return 1;
}
/// inconsistency in WSL1 Ubuntu - inode reported in /proc/self/maps is a 64bit to
/// 32bit conversion of input_info.st_ino
if (input_info.st_ino & 0xFFFFFFFF00000000 && !(inode & 0xFFFFFFFF00000000))
input_info.st_ino &= 0x00000000FFFFFFFF;
/// if decompression was performed by another process since this copy was started
/// then file referred by path "self" is already pointing to different inode
if (input_info.st_ino != inode)
{
struct stat lock_info;
if (0 != fstat(lock, &lock_info))
{
perror("lock open");
perror("fstat lock");
return 1;
}
/// lock file should be closed on exec call
fcntl(lock, F_SETFD, FD_CLOEXEC);
/// size 1 of lock file indicates that another decompressor has found active executable
if (lock_info.st_size == 1)
execv(self, argv);
if (lockf(lock, F_LOCK, 0))
{
perror("lockf");
return 1;
}
/// inconsistency in WSL1 Ubuntu - inode reported in /proc/self/maps is a 64bit to
/// 32bit conversion of input_info.st_ino
if (input_info.st_ino & 0xFFFFFFFF00000000 && !(inode & 0xFFFFFFFF00000000))
input_info.st_ino &= 0x00000000FFFFFFFF;
/// if decompression was performed by another process since this copy was started
/// then file referred by path "self" is already pointing to different inode
if (input_info.st_ino != inode)
{
struct stat lock_info;
if (0 != fstat(lock, &lock_info))
{
perror("fstat lock");
return 1;
}
/// size 1 of lock file indicates that another decompressor has found active executable
if (lock_info.st_size == 1)
execv(self, argv);
printf("No target executable - decompression only was performed.\n");
return 0;
}
printf("No target executable - decompression only was performed.\n");
return 0;
}
#endif
@ -549,19 +547,21 @@ int main(int/* argc*/, char* argv[])
if (has_exec)
{
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
/// write one byte to the lock in case other copies of compressed are running to indicate that
/// execution should be performed
if (lock >= 0)
write(lock, "1", 1);
write(lock, "1", 1);
#endif
execv(self, argv);
/// This part of code will be reached only if error happened
perror("execv");
return 1;
}
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
/// since inodes can be reused - it's a precaution if lock file already exists and have size of 1
if (lock >= 0)
ftruncate(lock, 0);
ftruncate(lock, 0);
#endif
printf("No target executable - decompression only was performed.\n");
}