Merge branch 'master' into test-repeat-last-command

This commit is contained in:
Robert Schulze 2022-09-02 15:08:01 +02:00 committed by GitHub
commit 7bd2fea1ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
71 changed files with 1302 additions and 103 deletions

View File

@ -15,7 +15,7 @@
*
* Allow to search for next character from the set of 'symbols...' in a string.
* It is similar to 'strpbrk', 'strcspn' (and 'strchr', 'memchr' in the case of one symbol and '\0'),
* but with the following differencies:
* but with the following differences:
* - works with any memory ranges, including containing zero bytes;
* - doesn't require terminating zero byte: end of memory range is passed explicitly;
* - if not found, returns pointer to end instead of nullptr;

View File

@ -63,7 +63,7 @@
* Very large size of memcpy typically indicates suboptimal (not cache friendly) algorithms in code or unrealistic scenarios,
* so we don't pay attention to using non-temporary stores.
*
* On recent Intel CPUs, the presence of "erms" makes "rep movsb" the most benefitial,
* On recent Intel CPUs, the presence of "erms" makes "rep movsb" the most beneficial,
* even comparing to non-temporary aligned unrolled stores even with the most wide registers.
*
* memcpy can be written in asm, C or C++. The latter can also use inline asm.

View File

@ -101,7 +101,7 @@
#endif
/*
* The pcg_extras namespace contains some support code that is likley to
* The pcg_extras namespace contains some support code that is likely to
* be useful for a variety of RNGs, including:
* - 128-bit int support for platforms where it isn't available natively
* - bit twiddling operations

View File

@ -22,7 +22,7 @@
/*
* This code provides a a C++ class that can provide 128-bit (or higher)
* integers. To produce 2K-bit integers, it uses two K-bit integers,
* placed in a union that allowes the code to also see them as four K/2 bit
* placed in a union that allows the code to also see them as four K/2 bit
* integers (and access them either directly name, or by index).
*
* It may seem like we're reinventing the wheel here, because several

View File

@ -723,7 +723,7 @@ bool Client::processWithFuzzing(const String & full_query)
// queries, for lack of a better solution.
// There is also a problem that fuzzer substitutes positive Int64
// literals or Decimal literals, which are then parsed back as
// UInt64, and suddenly duplicate alias substitition starts or stops
// UInt64, and suddenly duplicate alias substitution starts or stops
// working (ASTWithAlias::formatImpl) or something like that.
// So we compare not even the first and second formatting of the
// query, but second and third.

View File

@ -67,7 +67,7 @@ Run this tool inside your git repository. It will create .tsv files that can be
The tool can process large enough repositories in a reasonable time.
It has been tested on:
- ClickHouse: 31 seconds; 3 million rows;
- LLVM: 8 minues; 62 million rows;
- LLVM: 8 minutes; 62 million rows;
- Linux - 12 minutes; 85 million rows;
- Chromium - 67 minutes; 343 million rows;
(the numbers as of Sep 2020)

View File

@ -557,7 +557,7 @@ void Connection::sendQuery(
/// Send correct hash only for !INITIAL_QUERY, due to:
/// - this will avoid extra protocol complexity for simplest cases
/// - there is no need in hash for the INITIAL_QUERY anyway
/// (since there is no secure/unsecure changes)
/// (since there is no secure/non-secure changes)
if (client_info && !cluster_secret.empty() && client_info->query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
{
#if USE_SSL

View File

@ -41,7 +41,7 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
HedgedConnectionsFactory::~HedgedConnectionsFactory()
{
/// Stop anything that maybe in progress,
/// to avoid interfer with the subsequent connections.
/// to avoid interference with the subsequent connections.
///
/// I.e. some replcas may be in the establishing state,
/// this means that hedged connection is waiting for TablesStatusResponse,

View File

@ -15,8 +15,8 @@ namespace DB
static void callback(void * arg, int status, int, struct hostent * host)
{
auto * ptr_records = reinterpret_cast<std::unordered_set<std::string>*>(arg);
if (status == ARES_SUCCESS && host->h_aliases)
auto * ptr_records = static_cast<std::unordered_set<std::string>*>(arg);
if (ptr_records && status == ARES_SUCCESS)
{
/*
* In some cases (e.g /etc/hosts), hostent::h_name is filled and hostent::h_aliases is empty.
@ -28,11 +28,14 @@ namespace DB
ptr_records->insert(ptr_record);
}
int i = 0;
while (auto * ptr_record = host->h_aliases[i])
if (host->h_aliases)
{
ptr_records->insert(ptr_record);
i++;
int i = 0;
while (auto * ptr_record = host->h_aliases[i])
{
ptr_records->insert(ptr_record);
i++;
}
}
}
}

View File

@ -64,7 +64,7 @@ struct IntervalKind
const char * toNameOfFunctionExtractTimePart() const;
/// Converts the string representation of an interval kind to its IntervalKind equivalent.
/// Returns false if the conversion unsucceeded.
/// Returns false if the conversion did not succeed.
/// For example, `IntervalKind::tryParseString('second', result)` returns `result` equals `IntervalKind::Kind::Second`.
static bool tryParseString(const std::string & kind, IntervalKind::Kind & result);
};

View File

@ -33,7 +33,7 @@ public:
* max_protected_size shows how many of the most frequently used entries will not be evicted after a sequential scan.
* max_protected_size == 0 means that the default protected size is equal to half of the total max size.
*/
/// TODO: construct from special struct with cache policy parametrs (also with max_protected_size).
/// TODO: construct from special struct with cache policy parameters (also with max_protected_size).
SLRUCachePolicy(size_t max_size_, size_t max_elements_size_ = 0, double size_ratio = 0.5, OnWeightLossFunction on_weight_loss_function_ = {})
: max_protected_size(max_size_ * std::min(1.0, size_ratio))
, max_size(max_size_)

View File

@ -31,7 +31,7 @@ inline UInt64 clock_gettime_ns_adjusted(UInt64 prev_time, clockid_t clock_type =
}
/** Differs from Poco::Stopwatch only by using 'clock_gettime' instead of 'gettimeofday',
* returns nanoseconds instead of microseconds, and also by other minor differencies.
* returns nanoseconds instead of microseconds, and also by other minor differences.
*/
class Stopwatch
{

View File

@ -497,7 +497,7 @@ private:
/// last index of offsets that was not processed
size_t last;
/// limit for adding to hashtable. In worst case with case insentive search, the table will be filled at most as half
/// limit for adding to hashtable. In worst case with case insensitive search, the table will be filled at most as half
static constexpr size_t small_limit = VolnitskyTraits::hash_size / 8;
public:

View File

@ -58,7 +58,7 @@ Fuzzing data consists of:
else:
read_key()
if (7):
read_nonce (simillar to read_key)
read_nonce (similar to read_key)
if (8):
set current_key

View File

@ -27,7 +27,7 @@ enum SnapshotVersion : uint8_t
static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V5;
/// What is stored in binary shapsnot
/// What is stored in binary snapshot
struct SnapshotDeserializationResult
{
/// Storage

View File

@ -2192,7 +2192,7 @@ void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing)
}
catch (...)
{
LOG_FATAL(&Poco::Logger::get("KeeperStorage"), "Failed to rollback log. Terminating to avoid incosistencies");
LOG_FATAL(&Poco::Logger::get("KeeperStorage"), "Failed to rollback log. Terminating to avoid inconsistencies");
std::terminate();
}
}

View File

@ -53,7 +53,7 @@ public:
/// Session was actually removed
bool remove(int64_t session_id);
/// Update session expiry time (must be called on hearbeats)
/// Update session expiry time (must be called on heartbeats)
void addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms);
/// Get all expired sessions

View File

@ -1339,7 +1339,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
nuraft::async_result<bool>::handler_type when_done = [&snapshot_created] (bool & ret, nuraft::ptr<std::exception> &/*exception*/)
{
snapshot_created = ret;
std::cerr << "Snapshot finised\n";
std::cerr << "Snapshot finished\n";
};
state_machine->create_snapshot(s, when_done);

View File

@ -149,7 +149,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \
M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed queries (shards will process query up to the Complete stage, initiator just proxies the data from the shards). If 2 the initiator will apply ORDER BY and LIMIT stages (it is not in case when shard process query up to the Complete stage)", 0) \
M(UInt64, distributed_push_down_limit, 1, "If 1, LIMIT will be applied on each shard separatelly. Usually you don't need to use it, since this will be done automatically if it is possible, i.e. for simple query SELECT FROM LIMIT.", 0) \
M(UInt64, distributed_push_down_limit, 1, "If 1, LIMIT will be applied on each shard separately. Usually you don't need to use it, since this will be done automatically if it is possible, i.e. for simple query SELECT FROM LIMIT.", 0) \
M(Bool, optimize_distributed_group_by_sharding_key, true, "Optimize GROUP BY sharding_key queries (by avoiding costly aggregation on the initiator server).", 0) \
M(UInt64, optimize_skip_unused_shards_limit, 1000, "Limit for number of sharding key values, turns off optimize_skip_unused_shards if the limit is reached", 0) \
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
@ -366,6 +366,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, partial_merge_join_left_table_buffer_bytes, 0, "If not 0 group left table blocks in bigger ones for left-side table in partial merge join. It uses up to 2x of specified memory per joining thread.", 0) \
M(UInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \
M(UInt64, join_on_disk_max_files_to_merge, 64, "For MergeJoin on disk set how much files it's allowed to sort simultaneously. Then this value bigger then more memory used and then less disk I/O needed. Minimum is 2.", 0) \
M(UInt64, max_rows_in_set_to_optimize_join, 100'000, "Maximal size of the set to filter joined tables by each other row sets before joining. 0 - disable.", 0) \
\
M(Bool, compatibility_ignore_collation_in_create_table, true, "Compatibility ignore collation in create table", 0) \
\
M(String, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \

View File

@ -89,7 +89,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"22.3", {{"cast_ipv4_ipv6_default_on_conversion_error", true, false, "Make functions cast(value, 'IPv4') and cast(value, 'IPv6') behave same as toIPv4 and toIPv6 functions"}}},
{"21.12", {{"stream_like_engine_allow_direct_select", true, false, "Do not allow direct select for Kafka/RabbitMQ/FileLog by default"}}},
{"21.9", {{"output_format_decimal_trailing_zeros", true, false, "Do not output trailing zeros in text representation of Decimal types by default for better looking output"},
{"use_hedged_requests", false, true, "Enable Hedged Requests feature bu default"}}},
{"use_hedged_requests", false, true, "Enable Hedged Requests feature by default"}}},
{"21.7", {{"legacy_column_name_of_tuple_literal", true, false, "Add this setting only for compatibility reasons. It makes sense to set to 'true', while doing rolling update of cluster from version lower than 21.7 to higher"}}},
{"21.5", {{"async_socket_for_remote", false, true, "Fix all problems and turn on asynchronous reads from socket for remote queries by default again"}}},
{"21.3", {{"async_socket_for_remote", true, false, "Turn off asynchronous reads from socket for remote queries because of some problems"},

View File

@ -153,7 +153,7 @@ enum class HandleKafkaErrorMode
{
DEFAULT = 0, // Ignore errors with threshold.
STREAM, // Put errors to stream in the virtual column named ``_error.
/*FIXED_SYSTEM_TABLE, Put errors to in a fixed system table likey system.kafka_errors. This is not implemented now. */
/*FIXED_SYSTEM_TABLE, Put errors to in a fixed system table likely system.kafka_errors. This is not implemented now. */
/*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */
};

View File

@ -180,7 +180,7 @@ namespace detail
/** Returns array with UInt8 represent if key from in_keys array is in hierarchy of key from keys column.
* If value in result array is 1 that means key from in_keys array is in hierarchy of key from
* keys array with same index, 0 therwise.
* keys array with same index, 0 otherwise.
* For getting hierarchy implementation uses getKeysHierarchy function.
*
* Not: keys size must be equal to in_keys_size.

View File

@ -118,7 +118,7 @@ MongoDBDictionarySource::MongoDBDictionarySource(
Poco::URI poco_uri(uri);
// Parse database from URI. This is required for correctness -- the
// cursor is created using database name and colleciton name, so we have
// cursor is created using database name and collection name, so we have
// to specify them properly.
db = poco_uri.getPath();
// getPath() may return a leading slash, remove it.

View File

@ -244,7 +244,7 @@ void buildAttributeExpressionIfNeeded(
root->appendChild(expression_element);
}
/** Transofrms single dictionary attribute to configuration
/** Transforms single dictionary attribute to configuration
* third_column UInt8 DEFAULT 2 EXPRESSION rand() % 100 * 77
* to
* <attribute>

View File

@ -124,7 +124,7 @@ public:
virtual ~IMetadataStorage() = default;
/// ==== More specefic methods. Previous were almost general purpose. ====
/// ==== More specific methods. Previous were almost general purpose. ====
/// Read multiple metadata files into strings and return mapping from file_path -> metadata
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & file_paths) const = 0;

View File

@ -134,7 +134,7 @@ using FunctionArgumentDescriptors = std::vector<FunctionArgumentDescriptor>;
* (e.g. depending on result type or other trait).
* First, checks that number of arguments is as expected (including optional arguments).
* Second, checks that mandatory args present and have valid type.
* Third, checks optional arguents types, skipping ones that are missing.
* Third, checks optional arguments types, skipping ones that are missing.
*
* Please note that if you have several optional arguments, like f([a, b, c]),
* only these calls are considered valid:

View File

@ -24,7 +24,7 @@
# include <aws/core/utils/UUID.h>
# include <aws/core/http/HttpClientFactory.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/HeadObjectRequest.h> // Y_IGNORE
# include <aws/s3/model/HeadObjectRequest.h>
# include <IO/S3/PocoHTTPClientFactory.h>
# include <IO/S3/PocoHTTPClient.h>

View File

@ -39,6 +39,7 @@
#include <QueryPipeline/Pipe.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
@ -1436,7 +1437,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (!joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no joined plan for query");
auto add_sorting = [&settings, this] (QueryPlan & plan, const Names & key_names, bool is_right)
auto add_sorting = [&settings, this] (QueryPlan & plan, const Names & key_names, JoinTableSide join_pos)
{
SortDescription order_descr;
order_descr.reserve(key_names.size());
@ -1455,15 +1456,43 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
this->context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data,
settings.optimize_sorting_by_input_stream_properties);
sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", is_right ? "right" : "left"));
sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", join_pos));
plan.addStep(std::move(sorting_step));
};
auto crosswise_connection = CreateSetAndFilterOnTheFlyStep::createCrossConnection();
auto add_create_set = [&settings, crosswise_connection](QueryPlan & plan, const Names & key_names, JoinTableSide join_pos)
{
auto creating_set_step = std::make_unique<CreateSetAndFilterOnTheFlyStep>(
plan.getCurrentDataStream(), key_names, settings.max_rows_in_set_to_optimize_join, crosswise_connection, join_pos);
creating_set_step->setStepDescription(fmt::format("Create set and filter {} joined stream", join_pos));
auto * step_raw_ptr = creating_set_step.get();
plan.addStep(std::move(creating_set_step));
return step_raw_ptr;
};
if (expressions.join->pipelineType() == JoinPipelineType::YShaped)
{
const auto & join_clause = expressions.join->getTableJoin().getOnlyClause();
add_sorting(query_plan, join_clause.key_names_left, false);
add_sorting(*joined_plan, join_clause.key_names_right, true);
const auto & table_join = expressions.join->getTableJoin();
const auto & join_clause = table_join.getOnlyClause();
auto join_kind = table_join.kind();
bool kind_allows_filtering = isInner(join_kind) || isLeft(join_kind) || isRight(join_kind);
if (settings.max_rows_in_set_to_optimize_join > 0 && kind_allows_filtering)
{
auto * left_set = add_create_set(query_plan, join_clause.key_names_left, JoinTableSide::Left);
auto * right_set = add_create_set(*joined_plan, join_clause.key_names_right, JoinTableSide::Right);
if (isInnerOrLeft(join_kind))
right_set->setFiltering(left_set->getSet());
if (isInnerOrRight(join_kind))
left_set->setFiltering(right_set->getSet());
}
add_sorting(query_plan, join_clause.key_names_left, JoinTableSide::Left);
add_sorting(*joined_plan, join_clause.key_names_right, JoinTableSide::Right);
}
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(

View File

@ -22,6 +22,8 @@
#include <Interpreters/castColumn.h>
#include <Interpreters/Context.h>
#include <Processors/Chunk.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <base/range.h>
@ -162,8 +164,16 @@ void Set::setHeader(const ColumnsWithTypeAndName & header)
data.init(data.chooseMethod(key_columns, key_sizes));
}
bool Set::insertFromBlock(const ColumnsWithTypeAndName & columns)
{
Columns cols;
cols.reserve(columns.size());
for (const auto & column : columns)
cols.emplace_back(column.column);
return insertFromBlock(cols);
}
bool Set::insertFromBlock(const Columns & columns)
{
std::lock_guard<std::shared_mutex> lock(rwlock);
@ -179,11 +189,11 @@ bool Set::insertFromBlock(const ColumnsWithTypeAndName & columns)
/// Remember the columns we will work with
for (size_t i = 0; i < keys_size; ++i)
{
materialized_columns.emplace_back(columns.at(i).column->convertToFullIfNeeded());
materialized_columns.emplace_back(columns.at(i)->convertToFullIfNeeded());
key_columns.emplace_back(materialized_columns.back().get());
}
size_t rows = columns.at(0).column->size();
size_t rows = columns.at(0)->size();
/// We will insert to the Set only keys, where all components are not NULL.
ConstNullMapPtr null_map{};

View File

@ -20,6 +20,7 @@ class Context;
class IFunctionBase;
using FunctionBasePtr = std::shared_ptr<IFunctionBase>;
class Chunk;
/** Data structure for implementation of IN expression.
*/
@ -45,11 +46,14 @@ public:
void setHeader(const ColumnsWithTypeAndName & header);
/// Returns false, if some limit was exceeded and no need to insert more data.
bool insertFromBlock(const Columns & columns);
bool insertFromBlock(const ColumnsWithTypeAndName & columns);
/// Call after all blocks were inserted. To get the information that set is already created.
void finishInsert() { is_created = true; }
bool isCreated() const { return is_created; }
/// finishInsert and isCreated are thread-safe
bool isCreated() const { return is_created.load(); }
/** For columns of 'block', check belonging of corresponding rows to the set.
* Return UInt8 column with the result.
@ -111,7 +115,7 @@ private:
bool transform_null_in;
/// Check if set contains all the data.
bool is_created = false;
std::atomic<bool> is_created = false;
/// If in the left part columns contains the same types as the elements of the set.
void executeOrdinary(

View File

@ -73,16 +73,32 @@ public:
return key_names_right.size();
}
String formatDebug() const
String formatDebug(bool short_format = false) const
{
return fmt::format("Left keys: [{}] Right keys [{}] Condition columns: '{}', '{}'",
fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "),
condColumnNames().first, condColumnNames().second);
const auto & [left_cond, right_cond] = condColumnNames();
if (short_format)
{
return fmt::format("({}) = ({}){}{}", fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "),
!left_cond.empty() ? " AND " + left_cond : "", !right_cond.empty() ? " AND " + right_cond : "");
}
return fmt::format(
"Left keys: [{}] Right keys [{}] Condition columns: '{}', '{}'",
fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "), left_cond, right_cond);
}
};
using Clauses = std::vector<JoinOnClause>;
static std::string formatClauses(const Clauses & clauses, bool short_format = false)
{
std::vector<std::string> res;
for (const auto & clause : clauses)
res.push_back("[" + clause.formatDebug(short_format) + "]");
return fmt::format("{}", fmt::join(res, "; "));
}
private:
/** Query of the form `SELECT expr(x) AS k FROM t1 ANY LEFT JOIN (SELECT expr(x) AS k FROM t2) USING k`
* The join is made by column k.

View File

@ -453,7 +453,7 @@ void optimizeMonotonousFunctionsInOrderBy(ASTSelectQuery * select_query, Context
return;
/// Do not apply optimization for Distributed and Merge storages,
/// because we can't get the sorting key of their undelying tables
/// because we can't get the sorting key of their underlying tables
/// and we can break the matching of the sorting key for `read_in_order`
/// optimization by removing monotonous functions from the prefix of key.
if (result.is_remote_storage || (result.storage && result.storage->getName() == "Merge"))

View File

@ -55,7 +55,7 @@ void InsertQuerySettingsPushDownMatcher::visit(ASTSelectQuery & select_query, AS
insert_settings.push_back(setting);
else
{
/// Do not ovewrite setting that was passed for INSERT
/// Do not overwrite setting that was passed for INSERT
/// by settings that was passed for SELECT
}
}

View File

@ -11,7 +11,7 @@ struct SettingChange;
class SettingsChanges;
/// Pushdown SETTINGS clause that goes after FORMAT to the SELECT query:
/// (since settings after FORMAT parsed separatelly not in the ParserSelectQuery but in ParserQueryWithOutput)
/// (since settings after FORMAT parsed separately not in the ParserSelectQuery but in ParserQueryWithOutput)
///
/// SELECT 1 FORMAT Null SETTINGS max_block_size = 1 ->
/// SELECT 1 SETTINGS max_block_size = 1 FORMAT Null SETTINGS max_block_size = 1

View File

@ -0,0 +1,198 @@
#include <Processors/PingPongProcessor.h>
namespace DB
{
/// Create list with `num_ports` of regular ports and 1 auxiliary port with empty header.
template <typename T> requires std::is_same_v<T, InputPorts> || std::is_same_v<T, OutputPorts>
static T createPortsWithSpecial(const Block & header, size_t num_ports)
{
T res(num_ports, header);
res.emplace_back(Block());
return res;
}
PingPongProcessor::PingPongProcessor(const Block & header, size_t num_ports, Order order_)
: IProcessor(createPortsWithSpecial<InputPorts>(header, num_ports),
createPortsWithSpecial<OutputPorts>(header, num_ports))
, aux_in_port(inputs.back())
, aux_out_port(outputs.back())
, order(order_)
{
assert(order == First || order == Second);
port_pairs.resize(num_ports);
auto input_it = inputs.begin();
auto output_it = outputs.begin();
for (size_t i = 0; i < num_ports; ++i)
{
port_pairs[i].input_port = &*input_it;
++input_it;
port_pairs[i].output_port = &*output_it;
++output_it;
}
}
void PingPongProcessor::finishPair(PortsPair & pair)
{
if (!pair.is_finished)
{
pair.output_port->finish();
pair.input_port->close();
pair.is_finished = true;
++num_finished_pairs;
}
}
bool PingPongProcessor::processPair(PortsPair & pair)
{
if (pair.output_port->isFinished())
{
finishPair(pair);
return false;
}
if (pair.input_port->isFinished())
{
finishPair(pair);
return false;
}
if (!pair.output_port->canPush())
{
pair.input_port->setNotNeeded();
return false;
}
pair.input_port->setNeeded();
if (pair.input_port->hasData())
{
Chunk chunk = pair.input_port->pull(true);
ready_to_send |= consume(chunk);
pair.output_port->push(std::move(chunk));
}
return true;
}
bool PingPongProcessor::isPairsFinished() const
{
return num_finished_pairs == port_pairs.size();
}
IProcessor::Status PingPongProcessor::processRegularPorts()
{
if (isPairsFinished())
return Status::Finished;
bool need_data = false;
for (auto & pair : port_pairs)
need_data = processPair(pair) || need_data;
if (isPairsFinished())
return Status::Finished;
if (need_data)
return Status::NeedData;
return Status::PortFull;
}
bool PingPongProcessor::sendPing()
{
if (aux_out_port.canPush())
{
Chunk chunk(aux_out_port.getHeader().cloneEmpty().getColumns(), 0);
aux_out_port.push(std::move(chunk));
is_send = true;
aux_out_port.finish();
return true;
}
return false;
}
bool PingPongProcessor::recievePing()
{
if (aux_in_port.hasData())
{
aux_in_port.pull();
is_received = true;
aux_in_port.close();
return true;
}
return false;
}
bool PingPongProcessor::canSend() const
{
return !is_send && (ready_to_send || isPairsFinished());
}
IProcessor::Status PingPongProcessor::prepare()
{
if (!set_needed_once && !is_received && !aux_in_port.isFinished())
{
set_needed_once = true;
aux_in_port.setNeeded();
}
if (order == First || is_send)
{
if (!is_received)
{
bool received = recievePing();
if (!received)
{
return Status::NeedData;
}
}
}
if (order == Second || is_received)
{
if (!is_send && canSend())
{
bool sent = sendPing();
if (!sent)
return Status::PortFull;
}
}
auto status = processRegularPorts();
if (status == Status::Finished)
{
if (order == First || is_send)
{
if (!is_received)
{
bool received = recievePing();
if (!received)
{
return Status::NeedData;
}
}
}
if (order == Second || is_received)
{
if (!is_send && canSend())
{
bool sent = sendPing();
if (!sent)
return Status::PortFull;
}
}
}
return status;
}
std::pair<InputPort *, OutputPort *> PingPongProcessor::getAuxPorts()
{
return std::make_pair(&aux_in_port, &aux_out_port);
}
}

View File

@ -0,0 +1,105 @@
#pragma once
#include <Processors/IProcessor.h>
#include <base/unit.h>
#include <Processors/Chunk.h>
#include <Common/logger_useful.h>
namespace DB
{
/*
* Processor with N inputs and N outputs. Moves data from i-th input to i-th output as is.
* It has a pair of auxiliary ports to notify another instance by sending empty chunk after some condition holds.
* You should use this processor in pair of instances and connect auxiliary ports crosswise.
*
*
* aux
* PingPongProcessor PingPongProcessor
* aux
*
*
* One of the processors starts processing data, and another waits for notification.
* When `consume` returns true, the first stops processing, sends a ping to another and waits for notification.
* After that, the second one also processes data until `consume`, then send a notification back to the first one.
* After this roundtrip, processors bypass data from regular inputs to outputs.
*/
class PingPongProcessor : public IProcessor
{
public:
enum class Order : uint8_t
{
/// Processor that starts processing data.
First,
/// Processor that waits for notification.
Second,
};
using enum Order;
PingPongProcessor(const Block & header, size_t num_ports, Order order_);
Status prepare() override;
std::pair<InputPort *, OutputPort *> getAuxPorts();
/// Returns `true` when enough data consumed
virtual bool consume(const Chunk & chunk) = 0;
protected:
struct PortsPair
{
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
bool is_finished = false;
};
bool sendPing();
bool recievePing();
bool canSend() const;
bool isPairsFinished() const;
bool processPair(PortsPair & pair);
void finishPair(PortsPair & pair);
Status processRegularPorts();
std::vector<PortsPair> port_pairs;
size_t num_finished_pairs = 0;
InputPort & aux_in_port;
OutputPort & aux_out_port;
bool is_send = false;
bool is_received = false;
bool ready_to_send = false;
/// Used to set 'needed' flag once for auxiliary input at first `prepare` call.
bool set_needed_once = false;
Order order;
};
/// Reads first N rows from two streams evenly.
class ReadHeadBalancedProcessor : public PingPongProcessor
{
public:
ReadHeadBalancedProcessor(const Block & header, size_t num_ports, size_t size_to_wait_, Order order_)
: PingPongProcessor(header, num_ports, order_) , data_consumed(0) , size_to_wait(size_to_wait_)
{
}
String getName() const override { return "ReadHeadBalancedProcessor"; }
bool consume(const Chunk & chunk) override
{
data_consumed += chunk.getNumRows();
return data_consumed > size_to_wait;
}
private:
size_t data_consumed;
size_t size_to_wait;
};
}

View File

@ -8,18 +8,18 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
void connect(OutputPort & output, InputPort & input)
void connect(OutputPort & output, InputPort & input, bool reconnect)
{
if (input.state)
if (!reconnect && input.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", input.header.dumpStructure());
if (output.state)
if (!reconnect && output.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", output.header.dumpStructure());
auto out_name = output.getProcessor().getName();
auto in_name = input.getProcessor().getName();
auto out_name = output.processor ? output.getProcessor().getName() : "null";
auto in_name = input.processor ? input.getProcessor().getName() : "null";
assertCompatibleHeader(output.getHeader(), input.getHeader(), fmt::format(" function connect between {} and {}", out_name, in_name));
assertCompatibleHeader(output.getHeader(), input.getHeader(), fmt::format("function connect between {} and {}", out_name, in_name));
input.output_port = &output;
output.input_port = &input;

View File

@ -25,7 +25,7 @@ namespace ErrorCodes
class Port
{
friend void connect(OutputPort &, InputPort &);
friend void connect(OutputPort &, InputPort &, bool);
friend class IProcessor;
public:
@ -267,7 +267,7 @@ protected:
/// * You can pull only if port hasData().
class InputPort : public Port
{
friend void connect(OutputPort &, InputPort &);
friend void connect(OutputPort &, InputPort &, bool);
private:
OutputPort * output_port = nullptr;
@ -390,7 +390,7 @@ public:
/// * You can push only if port doesn't hasData().
class OutputPort : public Port
{
friend void connect(OutputPort &, InputPort &);
friend void connect(OutputPort &, InputPort &, bool);
private:
InputPort * input_port = nullptr;
@ -483,6 +483,6 @@ using InputPorts = std::list<InputPort>;
using OutputPorts = std::list<OutputPort>;
void connect(OutputPort & output, InputPort & input);
void connect(OutputPort & output, InputPort & input, bool reconnect = false);
}

View File

@ -0,0 +1,205 @@
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
#include <Processors/Transforms/CreateSetAndFilterOnTheFlyTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Processors/IProcessor.h>
#include <Processors/PingPongProcessor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static void connectAllInputs(OutputPortRawPtrs ports, InputPorts & inputs, size_t num_ports)
{
auto input_it = inputs.begin();
for (size_t i = 0; i < num_ports; ++i)
{
connect(*ports[i], *input_it);
input_it++;
}
}
static ColumnsWithTypeAndName getColumnSubset(const Block & block, const Names & column_names)
{
ColumnsWithTypeAndName result;
for (const auto & name : column_names)
result.emplace_back(block.getByName(name));
return result;
}
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}
class CreateSetAndFilterOnTheFlyStep::CrosswiseConnection : public boost::noncopyable
{
public:
using PortPair = std::pair<InputPort *, OutputPort *>;
/// Remember ports passed on the first call and connect with ones from second call.
/// Thread-safe.
void connectPorts(PortPair rhs_ports, IProcessor * proc)
{
assert(!rhs_ports.first->isConnected() && !rhs_ports.second->isConnected());
std::lock_guard<std::mutex> lock(mux);
if (input_port || output_port)
{
assert(input_port && output_port);
assert(!input_port->isConnected());
connect(*rhs_ports.second, *input_port);
connect(*output_port, *rhs_ports.first, /* reconnect= */ true);
}
else
{
std::tie(input_port, output_port) = rhs_ports;
assert(input_port && output_port);
assert(!input_port->isConnected() && !output_port->isConnected());
dummy_input_port = std::make_unique<InputPort>(output_port->getHeader(), proc);
connect(*output_port, *dummy_input_port);
}
}
private:
std::mutex mux;
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
/// Output ports should always be connected, and we can't add a step to the pipeline without them.
/// So, connect the port from the first processor to this dummy port and then reconnect to the second processor.
std::unique_ptr<InputPort> dummy_input_port;
};
CreateSetAndFilterOnTheFlyStep::CrosswiseConnectionPtr CreateSetAndFilterOnTheFlyStep::createCrossConnection()
{
return std::make_shared<CreateSetAndFilterOnTheFlyStep::CrosswiseConnection>();
}
CreateSetAndFilterOnTheFlyStep::CreateSetAndFilterOnTheFlyStep(
const DataStream & input_stream_,
const Names & column_names_,
size_t max_rows_in_set_,
CrosswiseConnectionPtr crosswise_connection_,
JoinTableSide position_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, column_names(column_names_)
, max_rows_in_set(max_rows_in_set_)
, own_set(std::make_shared<SetWithState>(SizeLimits(max_rows_in_set, 0, OverflowMode::BREAK), false, true))
, filtering_set(nullptr)
, crosswise_connection(crosswise_connection_)
, position(position_)
{
if (crosswise_connection == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Crosswise connection is not initialized");
if (input_streams.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Step requires exactly one input stream, got {}", input_streams.size());
own_set->setHeader(getColumnSubset(input_streams[0].header, column_names));
}
void CreateSetAndFilterOnTheFlyStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
size_t num_streams = pipeline.getNumStreams();
pipeline.addSimpleTransform([this, num_streams](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
auto res = std::make_shared<CreatingSetsOnTheFlyTransform>(header, column_names, num_streams, own_set);
res->setDescription(this->getStepDescription());
return res;
});
Block input_header = pipeline.getHeader();
auto pipeline_transform = [&input_header, this](OutputPortRawPtrs ports)
{
Processors result_transforms;
size_t num_ports = ports.size();
/// Add balancing transform
auto idx = position == JoinTableSide::Left ? PingPongProcessor::First : PingPongProcessor::Second;
auto stream_balancer = std::make_shared<ReadHeadBalancedProcessor>(input_header, num_ports, max_rows_in_set, idx);
stream_balancer->setDescription(getStepDescription());
/// Regular inputs just bypass data for respective ports
connectAllInputs(ports, stream_balancer->getInputs(), num_ports);
/// Connect auxiliary ports
crosswise_connection->connectPorts(stream_balancer->getAuxPorts(), stream_balancer.get());
if (!filtering_set)
{
LOG_DEBUG(log, "Skip filtering {} stream", position);
result_transforms.emplace_back(std::move(stream_balancer));
return result_transforms;
}
/// Add filtering transform, ports just connected respectively
auto & outputs = stream_balancer->getOutputs();
auto output_it = outputs.begin();
for (size_t i = 0; i < outputs.size() - 1; ++i)
{
auto & port = *output_it++;
auto transform = std::make_shared<FilterBySetOnTheFlyTransform>(port.getHeader(), column_names, filtering_set);
transform->setDescription(this->getStepDescription());
connect(port, transform->getInputPort());
result_transforms.emplace_back(std::move(transform));
}
assert(output_it == std::prev(outputs.end()));
result_transforms.emplace_back(std::move(stream_balancer));
return result_transforms;
};
/// Auxiliary port stream_balancer can be connected later (by crosswise_connection).
/// So, use unsafe `transform` with `check_ports = false` to avoid assertions
pipeline.transform(std::move(pipeline_transform), /* check_ports= */ false);
}
void CreateSetAndFilterOnTheFlyStep::describeActions(JSONBuilder::JSONMap & map) const
{
map.add(getName(), true);
}
void CreateSetAndFilterOnTheFlyStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
settings.out << prefix << getName();
settings.out << '\n';
}
void CreateSetAndFilterOnTheFlyStep::updateOutputStream()
{
if (input_streams.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} requires exactly one input stream, got {}", getName(), input_streams.size());
own_set->setHeader(getColumnSubset(input_streams[0].header, column_names));
output_stream = input_streams[0];
}
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/Transforms/CreateSetAndFilterOnTheFlyTransform.h>
#include <Processors/DelayedPortsProcessor.h>
namespace DB
{
/*
* Used to optimize JOIN when joining a small table over a large table.
* Currently applied only for the full sorting join.
* It tries to build a set for each stream.
* Once one stream is finished, it starts to filter another stream with this set.
*/
class CreateSetAndFilterOnTheFlyStep : public ITransformingStep
{
public:
/// Two instances of step need some shared state to connect processors crosswise
class CrosswiseConnection;
using CrosswiseConnectionPtr = std::shared_ptr<CrosswiseConnection>;
static CrosswiseConnectionPtr createCrossConnection();
CreateSetAndFilterOnTheFlyStep(
const DataStream & input_stream_,
const Names & column_names_,
size_t max_rows_in_set_,
CrosswiseConnectionPtr crosswise_connection_,
JoinTableSide position_);
String getName() const override { return "CreateSetAndFilterOnTheFlyStep"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
SetWithStatePtr getSet() const { return own_set; }
/// Set for another stream.
void setFiltering(SetWithStatePtr filtering_set_) { filtering_set = filtering_set_; }
private:
void updateOutputStream() override;
Names column_names;
size_t max_rows_in_set;
SetWithStatePtr own_set;
SetWithStatePtr filtering_set;
CrosswiseConnectionPtr crosswise_connection;
JoinTableSide position;
Poco::Logger * log = &Poco::Logger::get("CreateSetAndFilterOnTheFlyStep");
};
}

View File

@ -34,8 +34,12 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps");
if (join->pipelineType() == JoinPipelineType::YShaped)
return QueryPipelineBuilder::joinPipelinesYShaped(
{
auto joined_pipeline = QueryPipelineBuilder::joinPipelinesYShaped(
std::move(pipelines[0]), std::move(pipelines[1]), join, output_stream->header, max_block_size, &processors);
joined_pipeline->resize(max_streams);
return joined_pipeline;
}
return QueryPipelineBuilder::joinPipelinesRightLeft(
std::move(pipelines[0]),

View File

@ -8,6 +8,7 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
@ -22,6 +23,7 @@
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/TableJoin.h>
#include <fmt/format.h>
namespace DB::ErrorCodes
{
@ -134,10 +136,24 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
static size_t
tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const Names & allowed_inputs,
bool can_remove_filter = true)
bool can_remove_filter = true, size_t child_idx = 0)
{
if (auto split_filter = splitFilter(parent_node, allowed_inputs, 0))
return tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, 0);
if (auto split_filter = splitFilter(parent_node, allowed_inputs, child_idx))
return tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, child_idx);
return 0;
}
/// Push down filter through specified type of step
template <typename Step>
static size_t simplePushDownOverStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, QueryPlanStepPtr & child)
{
if (typeid_cast<Step *>(child.get()))
{
Names allowed_inputs = child->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
return updated_steps;
}
return 0;
}
@ -234,12 +250,8 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return updated_steps;
}
if (auto * distinct = typeid_cast<DistinctStep *>(child.get()))
{
Names allowed_inputs = distinct->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
return updated_steps;
}
if (auto updated_steps = simplePushDownOverStep<DistinctStep>(parent_node, nodes, child))
return updated_steps;
if (auto * join = typeid_cast<JoinStep *>(child.get()))
{
@ -290,7 +302,7 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
const size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, child_idx);
if (updated_steps > 0)
{
LOG_DEBUG(&Poco::Logger::get("QueryPlanOptimizations"), "Pushed down filter to {} side of join", kind);
LOG_DEBUG(&Poco::Logger::get("QueryPlanOptimizations"), "Pushed down filter {} to the {} side of join", split_filter_column_name, kind);
}
return updated_steps;
};
@ -321,12 +333,11 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
// {
// }
if (typeid_cast<SortingStep *>(child.get()))
{
Names allowed_inputs = child->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
return updated_steps;
}
if (auto updated_steps = simplePushDownOverStep<SortingStep>(parent_node, nodes, child))
return updated_steps;
if (auto updated_steps = simplePushDownOverStep<CreateSetAndFilterOnTheFlyStep>(parent_node, nodes, child))
return updated_steps;
if (auto * union_step = typeid_cast<UnionStep *>(child.get()))
{

View File

@ -85,6 +85,13 @@ public:
{
}
StrictResizeProcessor(InputPorts inputs_, OutputPorts outputs_)
: IProcessor(inputs_, outputs_)
, current_input(inputs.begin())
, current_output(outputs.begin())
{
}
String getName() const override { return "StrictResize"; }
Status prepare(const PortNumbers &, const PortNumbers &) override;

View File

@ -39,7 +39,7 @@ SQLiteSource::SQLiteSource(
if (status != SQLITE_OK)
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Cannot prepate sqlite statement. Status: {}. Message: {}",
"Cannot prepare sqlite statement. Status: {}. Message: {}",
status, sqlite3_errstr(status));
compiled_statement = std::unique_ptr<sqlite3_stmt, StatementDeleter>(compiled_stmt, StatementDeleter());

View File

@ -0,0 +1,195 @@
#include <Processors/Transforms/CreateSetAndFilterOnTheFlyTransform.h>
#include <cstddef>
#include <mutex>
#include <Interpreters/Set.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/logger_useful.h>
#include <Columns/IColumn.h>
#include <Core/ColumnWithTypeAndName.h>
#include <base/types.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
std::vector<size_t> getColumnIndices(const Block & block, const Names & column_names)
{
std::vector<size_t> indices;
for (const auto & name : column_names)
indices.push_back(block.getPositionByName(name));
return indices;
}
Columns getColumnsByIndices(const Chunk & chunk, const std::vector<size_t> & indices)
{
Columns columns;
const Columns & all_cols = chunk.getColumns();
for (const auto & index : indices)
columns.push_back(all_cols.at(index));
return columns;
}
ColumnsWithTypeAndName getColumnsByIndices(const Block & sample_block, const Chunk & chunk, const std::vector<size_t> & indices)
{
Block block = sample_block.cloneEmpty();
block.setColumns(getColumnsByIndices(chunk, indices));
return block.getColumnsWithTypeAndName();
}
}
CreatingSetsOnTheFlyTransform::CreatingSetsOnTheFlyTransform(
const Block & header_, const Names & column_names_, size_t num_streams_, SetWithStatePtr set_)
: ISimpleTransform(header_, header_, true)
, column_names(column_names_)
, key_column_indices(getColumnIndices(inputs.front().getHeader(), column_names))
, num_streams(num_streams_)
, set(set_)
{
}
IProcessor::Status CreatingSetsOnTheFlyTransform::prepare()
{
IProcessor::Status status = ISimpleTransform::prepare();
if (!set || status != Status::Finished)
/// Nothing to do with set
return status;
/// Finalize set
if (set->state == SetWithState::State::Creating)
{
if (input.isFinished())
{
set->finished_count++;
if (set->finished_count != num_streams)
/// Not all instances of processor are finished
return status;
set->finishInsert();
set->state = SetWithState::State::Finished;
LOG_DEBUG(log, "{}: finish building set for [{}] with {} rows, set size is {}",
getDescription(), fmt::join(column_names, ", "), set->getTotalRowCount(),
formatReadableSizeWithBinarySuffix(set->getTotalByteCount()));
set.reset();
}
else
{
/// Should not happen because processor inserted before join that reads all the data
throw Exception(ErrorCodes::LOGICAL_ERROR, "Processor finished, but not all input was read");
}
}
return status;
}
void CreatingSetsOnTheFlyTransform::transform(Chunk & chunk)
{
if (!set || set->state != SetWithState::State::Creating)
{
/// If set building suspended by another processor, release pointer
if (set != nullptr)
set.reset();
return;
}
if (chunk.getNumRows())
{
Columns key_columns = getColumnsByIndices(chunk, key_column_indices);
bool limit_exceeded = !set->insertFromBlock(key_columns);
if (limit_exceeded)
{
auto prev_state = set->state.exchange(SetWithState::State::Suspended);
/// Print log only after first state switch
if (prev_state == SetWithState::State::Creating)
{
LOG_DEBUG(log, "{}: set limit exceeded, give up building set, after reading {} rows and using {}",
getDescription(), set->getTotalRowCount(), formatReadableSizeWithBinarySuffix(set->getTotalByteCount()));
}
/// Probaply we need to clear set here, because it's unneeded anymore
/// But now `Set` doesn't have such method, so reset pointer in all processors and then it should be freed
set.reset();
}
}
}
FilterBySetOnTheFlyTransform::FilterBySetOnTheFlyTransform(const Block & header_, const Names & column_names_, SetWithStatePtr set_)
: ISimpleTransform(header_, header_, true)
, column_names(column_names_)
, key_column_indices(getColumnIndices(inputs.front().getHeader(), column_names))
, set(set_)
{
const auto & header = inputs.front().getHeader();
for (size_t idx : key_column_indices)
key_sample_block.insert(header.getByPosition(idx));
}
IProcessor::Status FilterBySetOnTheFlyTransform::prepare()
{
auto status = ISimpleTransform::prepare();
if (set && set->state == SetWithState::State::Suspended)
set.reset();
if (status == Status::Finished)
{
bool has_filter = set && set->state == SetWithState::State::Finished;
if (has_filter)
{
LOG_DEBUG(log, "Finished {} by [{}]: consumed {} rows in total, {} rows bypassed, result {} rows, {:.2f}% filtered",
Poco::toLower(getDescription()), fmt::join(column_names, ", "),
stat.consumed_rows, stat.consumed_rows_before_set, stat.result_rows,
100 - 100.0 * stat.result_rows / stat.consumed_rows);
}
else
{
LOG_DEBUG(log, "Finished {}: bypass {} rows", Poco::toLower(getDescription()), stat.consumed_rows);
}
/// Release set to free memory
set = nullptr;
}
return status;
}
void FilterBySetOnTheFlyTransform::transform(Chunk & chunk)
{
stat.consumed_rows += chunk.getNumRows();
stat.result_rows += chunk.getNumRows();
bool can_filter = set && set->state == SetWithState::State::Finished;
if (!can_filter)
stat.consumed_rows_before_set += chunk.getNumRows();
if (can_filter && chunk.getNumRows())
{
auto key_columns = getColumnsByIndices(key_sample_block, chunk, key_column_indices);
ColumnPtr mask_col = set->execute(key_columns, false);
const auto & mask = assert_cast<const ColumnUInt8 *>(mask_col.get())->getData();
stat.result_rows -= chunk.getNumRows();
Columns columns = chunk.detachColumns();
size_t result_num_rows = 0;
for (auto & col : columns)
{
col = col->filter(mask, /* negative */ false);
result_num_rows = col->size();
}
stat.result_rows += result_num_rows;
chunk.setColumns(std::move(columns), result_num_rows);
}
}
}

View File

@ -0,0 +1,114 @@
#pragma once
#include <atomic>
#include <mutex>
#include <vector>
#include <Processors/ISimpleTransform.h>
#include <Poco/Logger.h>
#include <Interpreters/Set.h>
namespace DB
{
struct SetWithState : public Set
{
using Set::Set;
/// Flow: Creating -> Finished or Suspended
enum class State
{
/// Set is not yet created,
/// Creating processor continues to build set.
/// Filtering bypasses data.
Creating,
/// Set is finished.
/// Creating processor is finished.
/// Filtering filter stream with this set.
Finished,
/// Set building is canceled (due to limit exceeded).
/// Creating and filtering processors bypass data.
Suspended,
};
std::atomic<State> state = State::Creating;
/// Track number of processors that are currently working on this set.
/// Last one finalizes set.
std::atomic_size_t finished_count = 0;
};
using SetWithStatePtr = std::shared_ptr<SetWithState>;
/*
* Create a set on the fly for incoming stream.
* The set is created from the key columns of the input block.
* Data is not changed and returned as is.
* Can be executed in parallel, but blocks on operations with set.
*/
class CreatingSetsOnTheFlyTransform : public ISimpleTransform
{
public:
CreatingSetsOnTheFlyTransform(const Block & header_, const Names & column_names_, size_t num_streams_, SetWithStatePtr set_);
String getName() const override { return "CreatingSetsOnTheFlyTransform"; }
Status prepare() override;
void transform(Chunk & chunk) override;
private:
Names column_names;
std::vector<size_t> key_column_indices;
size_t num_streams;
/// Set to fill
SetWithStatePtr set;
Poco::Logger * log = &Poco::Logger::get("CreatingSetsOnTheFlyTransform");
};
/*
* Filter the input chunk by the set.
* When set building is not completed, just return the source data.
*/
class FilterBySetOnTheFlyTransform : public ISimpleTransform
{
public:
FilterBySetOnTheFlyTransform(const Block & header_, const Names & column_names_, SetWithStatePtr set_);
String getName() const override { return "FilterBySetOnTheFlyTransform"; }
Status prepare() override;
void transform(Chunk & chunk) override;
private:
/// Set::execute requires ColumnsWithTypesAndNames, so we need to convert Chunk to that format
Block key_sample_block;
Names column_names;
std::vector<size_t> key_column_indices;
/// Filter by this set when it's created
SetWithStatePtr set;
/// Statistics to log
struct Stat
{
/// Total number of rows
size_t consumed_rows = 0;
/// Number of bypassed rows (processed before set is created)
size_t consumed_rows_before_set = 0;
/// Number of rows that passed the filter
size_t result_rows = 0;
} stat;
Poco::Logger * log = &Poco::Logger::get("FilterBySetOnTheFlyTransform");
};
}

View File

@ -513,7 +513,7 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
Columns lcols;
if (!left_to_right_key_remap.empty())
{
/// If we have remapped columns, then we need to get values from right columns insead of defaults
/// If we have remapped columns, then we need to get values from right columns instead of defaults
const auto & indices = idx_map[0];
const auto & left_src = cursors[0]->getCurrent().getColumns();

View File

@ -770,7 +770,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
header.clear();
}
void Pipe::transform(const Transformer & transformer)
void Pipe::transform(const Transformer & transformer, bool check_ports)
{
if (output_ports.empty())
throw Exception("Cannot transform empty Pipe", ErrorCodes::LOGICAL_ERROR);
@ -784,6 +784,9 @@ void Pipe::transform(const Transformer & transformer)
for (const auto & port : output_ports)
{
if (!check_ports)
break;
if (!port->isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -799,6 +802,9 @@ void Pipe::transform(const Transformer & transformer)
{
for (const auto & port : processor->getInputs())
{
if (!check_ports)
break;
if (!port.isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -806,7 +812,7 @@ void Pipe::transform(const Transformer & transformer)
processor->getName());
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (!set.contains(connected_processor))
if (check_ports && !set.contains(connected_processor))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has input port which is connected with unknown processor {}",
@ -823,7 +829,7 @@ void Pipe::transform(const Transformer & transformer)
}
const auto * connected_processor = &port.getInputPort().getProcessor();
if (!set.contains(connected_processor))
if (check_ports && !set.contains(connected_processor))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has output port which is connected with unknown processor {}",

View File

@ -85,13 +85,13 @@ public:
/// Add chain to every output port.
void addChains(std::vector<Chain> chains);
/// Changes the number of output ports if needed. Adds ResizeTransform.
/// Changes the number of output ports if needed. Adds (Strict)ResizeProcessor.
void resize(size_t num_streams, bool force = false, bool strict = false);
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform Pipe in general way.
void transform(const Transformer & transformer);
void transform(const Transformer & transformer, bool check_ports = true);
/// Unite several pipes together. They should have same header.
static Pipe unitePipes(Pipes pipes);

View File

@ -159,10 +159,10 @@ void QueryPipelineBuilder::addChain(Chain chain)
pipe.addChains(std::move(chains));
}
void QueryPipelineBuilder::transform(const Transformer & transformer)
void QueryPipelineBuilder::transform(const Transformer & transformer, bool check_ports)
{
checkInitializedAndNotCompleted();
pipe.transform(transformer);
pipe.transform(transformer, check_ports);
}
void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
@ -348,8 +348,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped
left->pipe.dropExtremes();
right->pipe.dropExtremes();
if (left->pipe.output_ports.size() != 1 || right->pipe.output_ports.size() != 1)
if (left->getNumStreams() != 1 || right->getNumStreams() != 1)
throw Exception("Join is supported only for pipelines with one output port", ErrorCodes::LOGICAL_ERROR);
if (left->hasTotals() || right->hasTotals())
@ -359,8 +358,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped
auto joining = std::make_shared<MergeJoinTransform>(join, inputs, out_header, max_block_size);
auto result = mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors);
return result;
return mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors);
}
std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLeft(

View File

@ -69,7 +69,7 @@ public:
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform pipeline in general way.
void transform(const Transformer & transformer);
void transform(const Transformer & transformer, bool check_ports = true);
/// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port.
void addTotalsHavingTransform(ProcessorPtr transform);

View File

@ -1274,7 +1274,7 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
throw Exception{"Table doesn't have SAMPLE BY, cannot remove", ErrorCodes::BAD_ARGUMENTS};
}
/// Collect default expressions for MODIFY and ADD comands
/// Collect default expressions for MODIFY and ADD commands
if (command.type == AlterCommand::MODIFY_COLUMN || command.type == AlterCommand::ADD_COLUMN)
{
if (command.default_expression)

View File

@ -810,7 +810,7 @@ void registerStorageKafka(StorageFactory & factory)
/** Arguments of engine is following:
* - Kafka broker list
* - List of topics
* - Group ID (may be a constaint expression with a string result)
* - Group ID (may be a constraint expression with a string result)
* - Message format (string)
* - Row delimiter
* - Schema (optional, if the format supports it)

View File

@ -382,7 +382,7 @@ bool StorageLiveView::getNewBlocks()
BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
/// can't set mergeable_blocks here or anywhere else outside the writeIntoLiveView function
/// as there could be a race codition when the new block has been inserted into
/// as there could be a race condition when the new block has been inserted into
/// the source table by the PushingToViews chain and this method
/// called before writeIntoLiveView function is called which can lead to
/// the same block added twice to the mergeable_blocks leading to

View File

@ -155,7 +155,7 @@ private:
* We use boost::circular_buffer as a container for queues not to do any allocations.
*
* Another nuisance that we faces with is than background operations always interact with an associated Storage.
* So, when a Storage want to shutdown, it must wait until all its background operaions are finished.
* So, when a Storage want to shutdown, it must wait until all its background operations are finished.
*/
template <class Queue>
class MergeTreeBackgroundExecutor final : boost::noncopyable

View File

@ -16,7 +16,7 @@ struct MergeTreeDataPartTTLInfo
time_t max = 0;
/// This TTL was computed on completely expired part. It doesn't make sense
/// to select such parts for TTL again. But make sense to recalcuate TTL
/// to select such parts for TTL again. But make sense to recalculate TTL
/// again for merge with multiple parts.
std::optional<bool> ttl_finished;
bool finished() const { return ttl_finished.value_or(false); }

View File

@ -279,7 +279,7 @@ private:
/// Very large queue entries may appear occasionally.
/// We cannot process MAX_MULTI_OPS at once because it will fail.
/// But we have to process more than one entry at once because otherwise lagged replicas keep up slowly.
/// Let's start with one entry per transaction and icrease it exponentially towards MAX_MULTI_OPS.
/// Let's start with one entry per transaction and increase it exponentially towards MAX_MULTI_OPS.
/// It will allow to make some progress before failing and remain operational even in extreme cases.
size_t current_multi_batch_size = 1;

View File

@ -104,7 +104,7 @@ struct PartitionCommandResultInfo
using PartitionCommandsResultInfo = std::vector<PartitionCommandResultInfo>;
/// Convert partition comands result to Source from single Chunk, which will be
/// Convert partition commands result to Source from single Chunk, which will be
/// used to print info to the user. Tries to create narrowest table for given
/// results. For example, if all commands were FREEZE commands, than
/// old_part_name column will be absent.

View File

@ -126,7 +126,7 @@ private:
static void assertCorrectInsertion(StorageData::Buffer & buffer, size_t column_idx);
/// lsn - log sequnce nuumber, like wal offset (64 bit).
/// lsn - log sequence number, like wal offset (64 bit).
static Int64 getLSNValue(const std::string & lsn)
{
UInt32 upper_half, lower_half;

View File

@ -963,7 +963,7 @@ bool StorageMergeTree::merge(
if (!merge_mutate_entry)
return false;
/// Copying a vector of columns `deduplicate bu columns.
/// Copying a vector of columns `deduplicate by columns.
IExecutableTask::TaskResultCallback f = [](bool) {};
auto task = std::make_shared<MergePlainMergeTreeTask>(
*this, metadata_snapshot, deduplicate, deduplicate_by_columns, merge_mutate_entry, table_lock_holder, f);

View File

@ -57,7 +57,7 @@ std::string maskDataPath(const std::string & path)
size_t user_pw_end = masked_path.find('@', node_pos);
if (user_pw_end == std::string::npos)
{
/// Likey new format (use_compact_format_in_distributed_parts_names=1)
/// Likely new format (use_compact_format_in_distributed_parts_names=1)
return path;
}

View File

@ -533,7 +533,7 @@ class TestCase:
else:
# If --database is not specified, we will create temporary database with
# unique name and we will recreate and drop it for each test
def random_str(length=6):
def random_str(length=8):
alphabet = string.ascii_lowercase + string.digits
# NOTE: it is important not to use default random generator, since it shares state.
return "".join(
@ -995,9 +995,12 @@ class TestCase:
args.timeout - (datetime.now() - start_time).total_seconds(), 20
)
try:
drop_database_query = "DROP DATABASE " + database
if args.replicated_database:
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
clickhouse_execute(
args,
"DROP DATABASE " + database,
drop_database_query,
timeout=seconds_left,
settings={
"log_comment": args.testcase_basename,

View File

@ -0,0 +1,45 @@
<test>
<substitutions>
<substitution>
<name>table_size</name>
<values>
<value>100000000</value>
</values>
</substitution>
</substitutions>
<settings>
<join_algorithm>full_sorting_merge</join_algorithm>
</settings>
<create_query>
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT
sipHash64(number, 't1_x') % {table_size} AS x,
sipHash64(number, 't1_y') % {table_size} AS y
FROM numbers({table_size})
</create_query>
<create_query>
CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT
sipHash64(number, 't2_x') % {table_size} AS x,
sipHash64(number, 't2_y') % {table_size} AS y
FROM numbers({table_size})
</create_query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE less(t1.y, 10000)</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE less(t1.y, 10000)</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE greater(t1.y, {table_size} - 10000)</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE greater(t1.y, {table_size} - 10000)</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 100 = 0</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE t1.y % 100 = 0</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 1000 = 0</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE t1.y % 1000 = 0</query>
<drop_query>DROP TABLE IF EXISTS t1</drop_query>
<drop_query>DROP TABLE IF EXISTS t2</drop_query>
</test>

View File

@ -7,6 +7,8 @@ USING (key);
SET join_algorithm = 'full_sorting_merge';
SET max_rows_in_set_to_optimize_join = 0;
EXPLAIN actions=0, description=0, header=1
SELECT * FROM ( SELECT 'key2' AS key ) AS s1
JOIN ( SELECT 'key1' AS key, '1' AS value UNION ALL SELECT 'key2' AS key, '1' AS value ) AS s2

View File

@ -0,0 +1,7 @@
106
46
42
51
42
24
10

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't1_x') % 100 AS x, sipHash64(number, 't1_y') % 100 AS y FROM numbers(100);
CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't2_x') % 100 AS x, sipHash64(number, 't2_y') % 100 AS y FROM numbers(100);
SET max_rows_in_set_to_optimize_join = 1000;
SET join_algorithm = 'full_sorting_merge';
-- different combinations of conditions on key/attribute columns for the left/right tables
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.x % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t2.y % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t2.x % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 2 == 0 AND t2.y % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.x % 2 == 0 AND t2.x % 2 == 0 AND t1.y % 2 == 0 AND t2.y % 2 == 0;

View File

@ -0,0 +1,10 @@
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok

View File

@ -0,0 +1,55 @@
#!/usr/bin/env bash
# Tags: no-asan,no-msan,no-tsan,no-ubsan
#
# Test doesn't run complex queries, just test the logic of setting, so no need to run with different builds.
# Also, we run similar queries in 02382_join_and_filtering_set.sql which is enabled for these builds.
#
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -mn -q """
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't1_x') % 100 AS x, sipHash64(number, 't1_y') % 100 AS y FROM numbers(100);
CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't2_x') % 100 AS x, sipHash64(number, 't2_y') % 100 AS y FROM numbers(100);
"""
# Arguments:
# - value of max_rows_in_set_to_optimize_join
# - join kind
# - expected number of steps in plan
# - expected number of steps in pipeline
function test() {
PARAM_VALUE=$1
JOIN_KIND=${2:-}
EXPECTED_PLAN_STEPS=$3
RES=$(
$CLICKHOUSE_CLIENT --max_rows_in_set_to_optimize_join=${PARAM_VALUE} --join_algorithm='full_sorting_merge' \
-q "EXPLAIN PLAN SELECT count() FROM t1 ${JOIN_KIND} JOIN t2 ON t1.x = t2.x" | grep -o 'CreateSetAndFilterOnTheFlyStep' | wc -l
)
[ "$RES" -eq "$EXPECTED_PLAN_STEPS" ] && echo "Ok" || echo "Fail: $RES != $EXPECTED_PLAN_STEPS"
EXPECTED_PIPELINE_STEPS=$4
RES=$(
$CLICKHOUSE_CLIENT --max_rows_in_set_to_optimize_join=${PARAM_VALUE} --join_algorithm='full_sorting_merge' \
-q "EXPLAIN PIPELINE SELECT count() FROM t1 ${JOIN_KIND} JOIN t2 ON t1.x = t2.x" \
| grep -o -e ReadHeadBalancedProcessor -e FilterBySetOnTheFlyTransform -e CreatingSetsOnTheFlyTransform | wc -l
)
[ "$RES" -eq "$EXPECTED_PIPELINE_STEPS" ] && echo "Ok" || echo "Fail: $RES != $EXPECTED_PIPELINE_STEPS"
}
test 1000 '' 2 6
# no filtering for left/right side
test 1000 'LEFT' 2 5
test 1000 'RIGHT' 2 5
# when disabled no extra steps should be created
test 1000 'FULL' 0 0
test 0 '' 0 0

View File

@ -5,7 +5,7 @@
ROOT_PATH=$(git rev-parse --show-toplevel)
codespell \
--skip "*generated*,*gperf*,*.bin,*.mrk*,*.idx,checksums.txt,*.dat,*.pyc,*.kate-swp,*obfuscateQueries.cpp,d3-*.js,*.min.js,${ROOT_PATH}/utils/check-style/aspell-ignore" \
--skip "*generated*,*gperf*,*.bin,*.mrk*,*.idx,checksums.txt,*.dat,*.pyc,*.kate-swp,*obfuscateQueries.cpp,d3-*.js,*.min.js,*.sum,${ROOT_PATH}/utils/check-style/aspell-ignore" \
--ignore-words "${ROOT_PATH}/utils/check-style/codespell-ignore-words.list" \
--exclude-file "${ROOT_PATH}/utils/check-style/codespell-ignore-lines.list" \
--quiet-level 2 \

View File

@ -16,3 +16,8 @@ ot
te
fo
ba
ro
rightt
iiterator
hastable
nam

View File

@ -13,6 +13,8 @@
#include <cstring>
#include <iostream>
#include <filesystem>
#include <fstream>
#include <sstream>
#if (defined(OS_DARWIN) || defined(OS_FREEBSD)) && defined(__GNUC__)
# include <machine/endian.h>
@ -359,6 +361,31 @@ int decompressFiles(int input_fd, char * path, char * name, bool & have_compress
#endif
uint32_t getInode(const char * self)
{
std::ifstream maps("/proc/self/maps");
if (maps.fail())
{
perror("open maps");
return 0;
}
/// Record example for /proc/self/maps:
/// address perms offset device inode pathname
/// 561a247de000-561a247e0000 r--p 00000000 103:01 1564 /usr/bin/cat
/// see "man 5 proc"
for (std::string line; std::getline(maps, line);)
{
std::stringstream ss(line); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
std::string addr, mode, offset, id, path;
uint32_t inode = 0;
if (ss >> addr >> mode >> offset >> id >> inode >> path && path == self)
return inode;
}
return 0;
}
int main(int/* argc*/, char* argv[])
{
char self[4096] = {0};
@ -382,6 +409,58 @@ int main(int/* argc*/, char* argv[])
else
name = file_path;
/// get inode of this executable
uint32_t inode = getInode(self);
if (inode == 0)
{
std::cerr << "Unable to obtain inode." << std::endl;
return 1;
}
std::stringstream lock_path; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
lock_path << "/tmp/" << name << ".decompression." << inode << ".lock";
int lock = open(lock_path.str().c_str(), O_CREAT | O_RDWR, 0666);
if (lock < 0)
{
perror("lock open");
return 1;
}
/// lock file should be closed on exec call
fcntl(lock, F_SETFD, FD_CLOEXEC);
if (lockf(lock, F_LOCK, 0))
{
perror("lockf");
return 1;
}
struct stat input_info;
if (0 != stat(self, &input_info))
{
perror("stat");
return 1;
}
/// if decompression was performed by another process since this copy was started
/// then file referred by path "self" is already pointing to different inode
if (input_info.st_ino != inode)
{
struct stat lock_info;
if (0 != fstat(lock, &lock_info))
{
perror("fstat lock");
return 1;
}
/// size 1 of lock file indicates that another decompressor has found active executable
if (lock_info.st_size == 1)
execv(self, argv);
printf("No target executable - decompression only was performed.\n");
return 0;
}
int input_fd = open(self, O_RDONLY);
if (input_fd == -1)
{
@ -443,6 +522,10 @@ int main(int/* argc*/, char* argv[])
if (has_exec)
{
/// write one byte to the lock in case other copies of compressed are running to indicate that
/// execution should be performed
write(lock, "1", 1);
execv(self, argv);
/// This part of code will be reached only if error happened
@ -450,6 +533,9 @@ int main(int/* argc*/, char* argv[])
return 1;
}
/// since inodes can be reused - it's a precaution if lock file already exists and have size of 1
ftruncate(lock, 0);
printf("No target executable - decompression only was performed.\n");
}
}