merge master

This commit is contained in:
Ivan Blinkov 2018-10-17 11:28:40 +03:00
commit 9245c40c33
67 changed files with 1720 additions and 178 deletions

View File

@ -2,10 +2,10 @@
set(VERSION_REVISION 54409 CACHE STRING "")
set(VERSION_MAJOR 18 CACHE STRING "")
set(VERSION_MINOR 14 CACHE STRING "")
set(VERSION_PATCH 8 CACHE STRING "")
set(VERSION_GITHASH 058f86debd7e672497667890feea60d46e4398bf CACHE STRING "")
set(VERSION_DESCRIBE v18.14.8-testing CACHE STRING "")
set(VERSION_STRING 18.14.8 CACHE STRING "")
set(VERSION_PATCH 9 CACHE STRING "")
set(VERSION_GITHASH 457f8fd495b2812940e69c15ab5b499cd863aae4 CACHE STRING "")
set(VERSION_DESCRIBE v18.14.9-testing CACHE STRING "")
set(VERSION_STRING 18.14.9 CACHE STRING "")
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -117,6 +117,14 @@ MutableColumnPtr ColumnDecimal<T>::cloneResized(size_t size) const
return std::move(res);
}
template <typename T>
void ColumnDecimal<T>::insertData(const char * src, size_t /*length*/)
{
T tmp;
memcpy(&tmp, src, sizeof(T));
data.emplace_back(tmp);
}
template <typename T>
void ColumnDecimal<T>::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{

View File

@ -89,7 +89,7 @@ public:
void reserve(size_t n) override { data.reserve(n); }
void insertFrom(const IColumn & src, size_t n) override { data.push_back(static_cast<const Self &>(src).getData()[n]); }
void insertData(const char * pos, size_t /*length*/) override { data.push_back(*reinterpret_cast<const T *>(pos)); }
void insertData(const char * pos, size_t /*length*/) override;
void insertDefault() override { data.push_back(T()); }
void insert(const Field & x) override { data.push_back(DB::get<typename NearestFieldType<T>::Type>(x)); }
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;

View File

@ -254,7 +254,8 @@ struct ODBCBridgeMixin
static void startBridge(const Poco::Util::AbstractConfiguration & config, Poco::Logger * log, const Poco::Timespan & http_timeout)
{
Poco::Path path{config.getString("application.dir", "")};
/// Path to executable folder
Poco::Path path{config.getString("application.dir", "/usr/bin")};
path.setFileName(
#if CLICKHOUSE_SPLIT_BINARY
@ -264,9 +265,6 @@ struct ODBCBridgeMixin
#endif
);
if (!Poco::File(path).exists())
throw Exception("clickhouse binary (" + path.toString() + ") is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND);
std::stringstream command;
command << path.toString() <<

View File

@ -1,5 +1,6 @@
#include <iomanip>
#include <Poco/Event.h>
#include <Poco/DirectoryIterator.h>
#include <common/logger_useful.h>
@ -41,7 +42,6 @@ namespace ErrorCodes
static constexpr size_t PRINT_MESSAGE_EACH_N_TABLES = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
static constexpr size_t TABLES_PARALLEL_LOAD_BUNCH_SIZE = 100;
namespace detail
{
@ -149,6 +149,9 @@ void DatabaseOrdinary::loadTables(
ErrorCodes::INCORRECT_FILE_NAME);
}
if (file_names.empty())
return;
/** Tables load faster if they are loaded in sorted (by name) order.
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
* which does not correspond to order tables creation and does not correspond to order of their location on disk.
@ -160,36 +163,27 @@ void DatabaseOrdinary::loadTables(
AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0};
Poco::Event all_tables_processed;
auto task_function = [&](FileNames::const_iterator begin, FileNames::const_iterator end)
auto task_function = [&](const String & table)
{
for (auto it = begin; it != end; ++it)
/// Messages, so that it's not boring to wait for the server to load for a long time.
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
const String & table = *it;
/// Messages, so that it's not boring to wait for the server to load for a long time.
if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
}
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
}
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
if (++tables_processed == total_tables)
all_tables_processed.set();
};
const size_t bunch_size = TABLES_PARALLEL_LOAD_BUNCH_SIZE;
size_t num_bunches = (total_tables + bunch_size - 1) / bunch_size;
for (size_t i = 0; i < num_bunches; ++i)
for (const auto & filename : file_names)
{
auto begin = file_names.begin() + i * bunch_size;
auto end = (i + 1 == num_bunches)
? file_names.end()
: (file_names.begin() + (i + 1) * bunch_size);
auto task = std::bind(task_function, begin, end);
auto task = std::bind(task_function, filename);
if (thread_pool)
thread_pool->schedule(task);
@ -198,7 +192,7 @@ void DatabaseOrdinary::loadTables(
}
if (thread_pool)
thread_pool->wait();
all_tables_processed.wait();
/// After all tables was basically initialized, startup them.
startupTables(thread_pool);
@ -212,47 +206,38 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0};
size_t total_tables = tables.size();
Poco::Event all_tables_processed;
auto task_function = [&](Tables::iterator begin, Tables::iterator end)
if (!total_tables)
return;
auto task_function = [&](const StoragePtr & table)
{
for (auto it = begin; it != end; ++it)
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
}
it->second->startup();
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
}
table->startup();
if (++tables_processed == total_tables)
all_tables_processed.set();
};
const size_t bunch_size = TABLES_PARALLEL_LOAD_BUNCH_SIZE;
size_t num_bunches = (total_tables + bunch_size - 1) / bunch_size;
auto begin = tables.begin();
for (size_t i = 0; i < num_bunches; ++i)
for (const auto & name_storage : tables)
{
auto end = begin;
if (i + 1 == num_bunches)
end = tables.end();
else
std::advance(end, bunch_size);
auto task = std::bind(task_function, begin, end);
auto task = std::bind(task_function, name_storage.second);
if (thread_pool)
thread_pool->schedule(task);
else
task();
begin = end;
}
if (thread_pool)
thread_pool->wait();
all_tables_processed.wait();
}

View File

@ -155,7 +155,8 @@ DictionarySourcePtr DictionarySourceFactory::create(
else if ("odbc" == source_type)
{
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(config, context.getSettings().http_connection_timeout, config.getString(config_prefix + ".odbc.connection_string"));
const auto & global_config = context.getConfigRef();
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(global_config, context.getSettings().http_connection_timeout, config.getString(config_prefix + ".odbc.connection_string"));
return std::make_unique<XDBCDictionarySource>(dict_struct, config, config_prefix + ".odbc", sample_block, context, bridge);
#else
throw Exception{"Dictionary source of type `odbc` is disabled because poco library was built without ODBC support.",

View File

@ -14,17 +14,28 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
}
ExternalQueryBuilder::ExternalQueryBuilder(
const DictionaryStructure & dict_struct,
const std::string & db,
const std::string & table,
const std::string & where,
IdentifierQuotingStyle quoting_style)
: dict_struct(dict_struct), db(db), table(table), where(where), quoting_style(quoting_style)
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_)
: dict_struct(dict_struct_), db(db_), where(where_), quoting_style(quoting_style_)
{
if (auto pos = table_.find('.'); pos != std::string::npos)
{
schema = table_.substr(0, pos);
table = table_.substr(pos + 1);
}
else
{
schema = "";
table = table_;
}
}
@ -124,6 +135,11 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
if (!where.empty())
@ -187,6 +203,12 @@ std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64>
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);
@ -250,6 +272,12 @@ std::string ExternalQueryBuilder::composeLoadKeysQuery(
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);

View File

@ -18,19 +18,20 @@ class WriteBuffer;
struct ExternalQueryBuilder
{
const DictionaryStructure & dict_struct;
const std::string & db;
const std::string & table;
std::string db;
std::string table;
std::string schema;
const std::string & where;
IdentifierQuotingStyle quoting_style;
ExternalQueryBuilder(
const DictionaryStructure & dict_struct,
const std::string & db,
const std::string & table,
const std::string & where,
IdentifierQuotingStyle quoting_style);
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_);
/** Generate a query to load all data. */
std::string composeLoadAllQuery() const;

View File

@ -1132,6 +1132,9 @@ void Aggregator::convertToBlockImpl(
if (data.empty())
return;
if (key_columns.size() != params.keys_size)
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};
if (final)
convertToBlockImplFinal(method, data, key_columns, final_aggregate_columns);
else
@ -1151,7 +1154,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
{
for (const auto & value : data)
{
method.insertKeyIntoColumns(value, key_columns, params.keys_size, key_sizes);
method.insertKeyIntoColumns(value, key_columns, key_sizes);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
@ -1169,10 +1172,9 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns) const
{
for (auto & value : data)
{
method.insertKeyIntoColumns(value, key_columns, params.keys_size, key_sizes);
method.insertKeyIntoColumns(value, key_columns, key_sizes);
/// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i)

View File

@ -166,7 +166,7 @@ struct AggregationMethodOneNumber
/** Insert the key from the hash table into columns.
*/
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t /*keys_size*/, const Sizes & /*key_sizes*/)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
{
static_cast<ColumnVector<FieldType> *>(key_columns[0].get())->insertData(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
@ -243,7 +243,7 @@ struct AggregationMethodString
return StringRef(value.first.data, value.first.size);
}
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
{
key_columns[0]->insertData(value.first.data, value.first.size);
}
@ -312,7 +312,7 @@ struct AggregationMethodFixedString
return StringRef(value.first.data, value.first.size);
}
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
{
key_columns[0]->insertData(value.first.data, value.first.size);
}
@ -580,7 +580,7 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
static const bool no_consecutive_keys_optimization = true;
static const bool low_cardinality_optimization = true;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t /*keys_size*/, const Sizes & /*key_sizes*/)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
{
auto ref = Base::getValueRef(value);
static_cast<ColumnLowCardinality *>(key_columns[0].get())->insertData(ref.data, ref.size);
@ -783,8 +783,10 @@ struct AggregationMethodKeysFixed
static const bool no_consecutive_keys_optimization = false;
static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t keys_size, const Sizes & key_sizes)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & key_sizes)
{
size_t keys_size = key_columns.size();
static constexpr auto bitmap_size = has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;
/// In any hash key value, column values to be read start just after the bitmap, if it exists.
size_t pos = bitmap_size;
@ -891,10 +893,10 @@ struct AggregationMethodSerialized
static const bool no_consecutive_keys_optimization = true;
static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t keys_size, const Sizes &)
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
{
auto pos = value.first.data;
for (size_t i = 0; i < keys_size; ++i)
for (size_t i = 0; i < key_columns.size(); ++i)
pos = key_columns[i]->deserializeAndInsertFromArena(pos);
}
@ -1284,10 +1286,10 @@ public:
Block intermediate_header;
/// What to count.
ColumnNumbers keys;
AggregateDescriptions aggregates;
size_t keys_size;
size_t aggregates_size;
const ColumnNumbers keys;
const AggregateDescriptions aggregates;
const size_t keys_size;
const size_t aggregates_size;
/// The settings of approximate calculation of GROUP BY.
const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
@ -1344,9 +1346,6 @@ public:
{
intermediate_header = intermediate_header_;
}
/// Calculate the column numbers in `keys` and `aggregates`.
void calculateColumnNumbers(const Block & block);
};
Aggregator(const Params & params_);

View File

@ -277,7 +277,7 @@ struct Settings
M(SettingBool, log_query_settings, true, "Log query settings into the query_log.") \
M(SettingBool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.") \
M(SettingString, send_logs_level, "none", "Send server text logs with specified minumum level to client. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'none'") \
M(SettingBool, enable_optimize_predicate_expression, 1, "If it is set to true, optimize predicates to subqueries.") \
M(SettingBool, enable_optimize_predicate_expression, 0, "If it is set to true, optimize predicates to subqueries.") \
\
M(SettingUInt64, low_cardinality_max_dictionary_size, 8192, "Maximum size (in rows) of shared global dictionary for LowCardinality type.") \
M(SettingBool, low_cardinality_use_single_dictionary_for_part, false, "LowCardinality type serialization setting. If is true, than will use additional keys when global dictionary overflows. Otherwise, will create several shared dictionaries.") \

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
MergeTreeData & storage,
const MergeTreeData & storage,
const PrewhereInfoPtr & prewhere_info,
UInt64 max_block_size_rows,
UInt64 preferred_block_size_bytes,

View File

@ -18,7 +18,7 @@ class MergeTreeBaseBlockInputStream : public IProfilingBlockInputStream
{
public:
MergeTreeBaseBlockInputStream(
MergeTreeData & storage,
const MergeTreeData & storage,
const PrewhereInfoPtr & prewhere_info,
UInt64 max_block_size_rows,
UInt64 preferred_block_size_bytes,
@ -47,7 +47,7 @@ protected:
void injectVirtualColumns(Block & block) const;
protected:
MergeTreeData & storage;
const MergeTreeData & storage;
PrewhereInfoPtr prewhere_info;

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
MergeTreeBlockInputStream::MergeTreeBlockInputStream(
MergeTreeData & storage_,
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & owned_data_part_,
size_t max_block_size_rows_,
size_t preferred_block_size_bytes_,

View File

@ -17,7 +17,7 @@ class MergeTreeBlockInputStream : public MergeTreeBaseBlockInputStream
{
public:
MergeTreeBlockInputStream(
MergeTreeData & storage,
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & owned_data_part,
size_t max_block_size_rows,
size_t preferred_block_size_bytes,

View File

@ -1223,7 +1223,9 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
* temporary column name ('converting_column_name') created in 'createConvertExpression' method
* will have old name of shared offsets for arrays.
*/
MergedColumnOnlyOutputStream out(*this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_settings, true /* skip_offsets */);
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
*this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_settings, true /* skip_offsets */, unused_written_offsets);
in.readPrefix();
out.writePrefix();

View File

@ -481,7 +481,7 @@ public:
bool skip_sanity_checks);
/// Should be called if part data is suspected to be corrupted.
void reportBrokenPart(const String & name)
void reportBrokenPart(const String & name) const
{
broken_part_callback(name);
}

View File

@ -22,8 +22,6 @@
#include <Interpreters/MutationsInterpreter.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBufferFromFile.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <Common/SimpleIncrement.h>
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>
@ -750,7 +748,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->progress.store(column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact), std::memory_order_relaxed);
BlockInputStreams column_part_streams(parts.size());
NameSet offset_columns_written;
auto it_name_and_type = gathering_columns.cbegin();
@ -767,22 +764,20 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0);
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns;
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
column_num < gathering_column_names_size;
++column_num, ++it_name_and_type)
{
const String & column_name = it_name_and_type->name;
const DataTypePtr & column_type = it_name_and_type->type;
const String offset_column_name = Nested::extractTableName(column_name);
Names column_name_{column_name};
Names column_names{column_name};
Float64 progress_before = merge_entry->progress.load(std::memory_order_relaxed);
bool offset_written = offset_columns_written.count(offset_column_name);
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_names, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
false, nullptr, true, min_bytes_when_use_direct_io, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
@ -793,7 +788,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
MergedColumnOnlyOutputStream column_to(data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, offset_written);
MergedColumnOnlyOutputStream column_to(
data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, false, written_offset_columns);
size_t column_elems_written = 0;
column_to.writePrefix();
@ -811,9 +807,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
", but " + toString(rows_written) + " rows of PK columns", ErrorCodes::LOGICAL_ERROR);
}
if (typeid_cast<const DataTypeArray *>(column_type.get()))
offset_columns_written.emplace(offset_column_name);
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
merge_entry->columns_written = merging_column_names.size() + column_num;
@ -971,7 +964,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
createHardLink(dir_it.path().toString(), destination.toString());
}
MergedColumnOnlyOutputStream out(data, in_header, new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false);
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
data, in_header, new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false, unused_written_offsets);
in->readPrefix();
out.writePrefix();

View File

@ -27,7 +27,7 @@ struct MergeTreeDataPart
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
MergeTreeDataPart(MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_)
MergeTreeDataPart(const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_)
: storage(storage_), name(name_), info(info_)
{
}
@ -77,7 +77,7 @@ struct MergeTreeDataPart
bool isEmpty() const { return rows_count == 0; }
MergeTreeData & storage;
const MergeTreeData & storage;
String name;
MergeTreePartInfo info;

View File

@ -67,7 +67,7 @@ namespace ErrorCodes
}
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_)
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_)
: data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)"))
{
}

View File

@ -17,7 +17,7 @@ class KeyCondition;
class MergeTreeDataSelectExecutor
{
public:
MergeTreeDataSelectExecutor(MergeTreeData & data_);
MergeTreeDataSelectExecutor(const MergeTreeData & data_);
/** When reading, selects a set of parts that covers the desired range of the index.
* max_block_number_to_read - if not zero, do not read all the parts whose right border is greater than this threshold.
@ -40,7 +40,7 @@ public:
Int64 max_block_number_to_read) const;
private:
MergeTreeData & data;
const MergeTreeData & data;
Logger * log;

View File

@ -16,7 +16,7 @@ namespace DB
MergeTreeReadPool::MergeTreeReadPool(
const size_t threads, const size_t sum_marks, const size_t min_marks_for_concurrent_read,
RangesInDataParts parts, MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
RangesInDataParts parts, const MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
const bool check_columns, const Names & column_names,
const BackoffSettings & backoff_settings, size_t preferred_block_size_bytes,
const bool do_not_steal_tasks)

View File

@ -67,7 +67,7 @@ private:
public:
MergeTreeReadPool(
const size_t threads, const size_t sum_marks, const size_t min_marks_for_concurrent_read,
RangesInDataParts parts, MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
RangesInDataParts parts, const MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
const bool check_columns, const Names & column_names,
const BackoffSettings & backoff_settings, size_t preferred_block_size_bytes,
const bool do_not_steal_tasks = false);
@ -91,7 +91,7 @@ private:
RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
std::vector<std::shared_lock<std::shared_mutex>> per_part_columns_lock;
MergeTreeData & data;
const MergeTreeData & data;
Names column_names;
Names ordered_names;
bool do_not_steal_tasks;

View File

@ -36,7 +36,7 @@ MergeTreeReader::~MergeTreeReader() = default;
MergeTreeReader::MergeTreeReader(const String & path,
const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns,
UncompressedCache * uncompressed_cache, MarkCache * mark_cache, bool save_marks_in_cache,
MergeTreeData & storage, const MarkRanges & all_mark_ranges,
const MergeTreeData & storage, const MarkRanges & all_mark_ranges,
size_t aio_threshold, size_t max_read_buffer_size, const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & profile_callback,
clockid_t clock_type)

View File

@ -30,7 +30,7 @@ public:
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
bool save_marks_in_cache,
MergeTreeData & storage, const MarkRanges & all_mark_ranges,
const MergeTreeData & storage, const MarkRanges & all_mark_ranges,
size_t aio_threshold, size_t max_read_buffer_size,
const ValueSizeMap & avg_value_size_hints = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback = ReadBufferFromFileBase::ProfileCallback{},
@ -111,7 +111,7 @@ private:
/// If save_marks_in_cache is false, then, if marks are not in cache, we will load them but won't save in the cache, to avoid evicting other data.
bool save_marks_in_cache;
MergeTreeData & storage;
const MergeTreeData & storage;
MarkRanges all_mark_ranges;
size_t aio_threshold;
size_t max_read_buffer_size;

View File

@ -14,7 +14,7 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
const size_t max_block_size_rows,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
MergeTreeData & storage,
const MergeTreeData & storage,
const bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const Settings & settings,

View File

@ -21,7 +21,7 @@ public:
const size_t max_block_size,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
MergeTreeData & storage,
const MergeTreeData & storage,
const bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const Settings & settings,

View File

@ -19,6 +19,7 @@ constexpr auto MARKS_FILE_EXTENSION = ".mrk";
}
/// Implementation of IMergedBlockOutputStream.
IMergedBlockOutputStream::IMergedBlockOutputStream(
@ -70,7 +71,7 @@ void IMergedBlockOutputStream::addStreams(
IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
const String & name, OffsetColumns & offset_columns, bool skip_offsets)
const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets)
{
return [&, skip_offsets] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
@ -93,7 +94,7 @@ void IMergedBlockOutputStream::writeData(
const String & name,
const IDataType & type,
const IColumn & column,
OffsetColumns & offset_columns,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state)
{
@ -304,7 +305,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
WrittenOffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
@ -395,7 +396,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
size_t rows = block.rows();
/// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
OffsetColumns offset_columns;
WrittenOffsetColumns offset_columns;
auto sort_columns = storage.getPrimarySortColumns();
@ -427,7 +428,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
if (serialization_states.empty())
{
serialization_states.reserve(columns_list.size());
OffsetColumns tmp_offset_columns;
WrittenOffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
for (const auto & col : columns_list)
@ -501,12 +502,15 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
/// Implementation of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_)
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionSettings compression_settings, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_settings,
storage_.context.getSettings().min_bytes_to_use_direct_io),
header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_)
header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_),
already_written_offset_columns(already_written_offset_columns)
{
}
@ -517,7 +521,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
column_streams.clear();
serialization_states.clear();
serialization_states.reserve(block.columns());
OffsetColumns tmp_offset_columns;
WrittenOffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
for (size_t i = 0; i < block.columns(); ++i)
@ -535,7 +539,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
size_t rows = block.rows();
OffsetColumns offset_columns;
WrittenOffsetColumns offset_columns = already_written_offset_columns;
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
@ -558,11 +562,11 @@ MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndG
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
for (size_t i = 0; i < header.columns(); ++i)
for (size_t i = 0, size = header.columns(); i < size; ++i)
{
auto & column = header.safeGetByPosition(i);
serialize_settings.getter = createStreamGetter(column.name, offset_columns, skip_offsets);
auto & column = header.getByPosition(i);
serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets);
column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}

View File

@ -23,8 +23,9 @@ public:
CompressionSettings compression_settings_,
size_t aio_threshold_);
using WrittenOffsetColumns = std::set<std::string>;
protected:
using OffsetColumns = std::set<std::string>;
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::vector<SerializationState>;
@ -67,10 +68,10 @@ protected:
void addStreams(const String & path, const String & name, const IDataType & type, size_t estimated_size, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, OffsetColumns & offset_columns, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
/// Write data of one column.
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns,
void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenOffsetColumns & offset_columns,
bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state);
MergeTreeData & storage;
@ -150,13 +151,17 @@ private:
};
/// Writes only those columns that are in `block`
/// Writes only those columns that are in `header`
class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream
{
public:
/// skip_offsets: used when ALTERing columns if we know that array offsets are not altered.
/// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream
/// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_);
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionSettings compression_settings, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns);
Block getHeader() const override { return header; }
void write(const Block & block) override;
@ -171,6 +176,9 @@ private:
bool initialized = false;
bool sync;
bool skip_offsets;
/// To correctly write Nested elements column-by-column.
WrittenOffsetColumns & already_written_offset_columns;
};
}

View File

@ -14,9 +14,9 @@ Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo python-psycopg2`
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER` and re-login.
(You must close all your sessions (for example, restart your computer))

View File

@ -13,6 +13,7 @@ import pymysql
import xml.dom.minidom
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
import psycopg2
import docker
from docker.errors import ContainerError
@ -79,6 +80,7 @@ class ClickHouseCluster:
self.instances = {}
self.with_zookeeper = False
self.with_mysql = False
self.with_postgres = False
self.with_kafka = False
self.with_odbc_drivers = False
@ -92,7 +94,7 @@ class ClickHouseCluster:
cmd += " client"
return cmd
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, hostname=None, env_variables={}, image="ubuntu:14.04"):
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, hostname=None, env_variables={}, image="ubuntu:14.04"):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
@ -127,6 +129,12 @@ class ClickHouseCluster:
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
if with_postgres and not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
if with_odbc_drivers and not self.with_odbc_drivers:
self.with_odbc_drivers = True
if not self.with_mysql:
@ -134,6 +142,12 @@ class ClickHouseCluster:
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
if not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
if with_kafka and not self.with_kafka:
self.with_kafka = True
@ -168,6 +182,21 @@ class ClickHouseCluster:
raise Exception("Cannot wait MySQL container")
def wait_postgres_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.close()
print "Postgres Started"
return
except Exception as ex:
print "Can't connect to Postgres " + str(ex)
time.sleep(0.5)
raise Exception("Cannot wait Postgres container")
def wait_zookeeper_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
@ -204,20 +233,24 @@ class ClickHouseCluster:
self.docker_client = docker.from_env(version=self.docker_api_version)
if self.with_zookeeper and self.base_zookeeper_cmd:
subprocess_check_call(self.base_zookeeper_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_zookeeper_cmd + ['up', '-d', '--force-recreate'])
for command in self.pre_zookeeper_commands:
self.run_kazoo_commands_with_retries(command, repeats=5)
self.wait_zookeeper_to_start(120)
if self.with_mysql and self.base_mysql_cmd:
subprocess_check_call(self.base_mysql_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_mysql_cmd + ['up', '-d', '--force-recreate'])
self.wait_mysql_to_start(120)
if self.with_postgres and self.base_postgres_cmd:
subprocess_check_call(self.base_postgres_cmd + ['up', '-d', '--force-recreate'])
self.wait_postgres_to_start(120)
if self.with_kafka and self.base_kafka_cmd:
subprocess_check_call(self.base_kafka_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_kafka_cmd + ['up', '-d', '--force-recreate'])
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
subprocess_check_call(self.base_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_cmd + ['up', '-d', '--force-recreate'])
start_deadline = time.time() + 20.0 # seconds
for instance in self.instances.itervalues():
@ -444,8 +477,18 @@ class ClickHouseInstance:
},
"PostgreSQL": {
"DSN": "postgresql_odbc",
"Database": "postgres",
"UserName": "postgres",
"Password": "mysecretpassword",
"Port": "5432",
"Servername": "postgres1",
"Protocol": "9.3",
"ReadOnly": "No",
"RowVersioning": "No",
"ShowSystemTables": "No",
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so",
"Setup": "/usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so",
"ConnSettings": "",
}
}
else:

View File

@ -0,0 +1,9 @@
version: '2'
services:
postgres1:
image: postgres
restart: always
environment:
POSTGRES_PASSWORD: mysecretpassword
ports:
- 5432:5432

View File

@ -0,0 +1,38 @@
<dictionaries>
<dictionary>
<name>postgres_odbc_hashed</name>
<source>
<odbc>
<table>clickhouse.test_table</table>
<connection_string>DSN=postgresql_odbc;</connection_string>
<db>postgres</db>
</odbc>
</source>
<lifetime>
<min>5</min>
<max>5</max>
</lifetime>
<layout>
<hashed />
</layout>
<structure>
<id>
<name>column1</name>
</id>
<attribute>
<name>column1</name>
<type>Int64</type>
<null_value>1</null_value>
</attribute>
<attribute>
<name>column2</name>
<type>String</type>
<null_value>''</null_value>
</attribute>
</structure>
</dictionary>
</dictionaries>

View File

@ -3,12 +3,14 @@ import pytest
import os
import pymysql.cursors
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True, image='alesapin/ubuntu_with_odbc:14.04', main_configs=['configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml', 'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml'])
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True, image='alesapin/ubuntu_with_odbc:14.04', main_configs=['configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml', 'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml', 'configs/dictionaries/postgres_odbc_hashed_dictionary.xml'])
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
@ -31,24 +33,49 @@ def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(table_name))
def get_postgres_conn():
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE SCHEMA {}".format(name))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
print "sqlite data received"
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t1(x INTEGER PRIMARY KEY ASC, y, z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t2(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t3(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t4(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
conn = get_mysql_conn()
print "sqlite tables created"
mysql_conn = get_mysql_conn()
print "mysql connection received"
## create mysql db and table
create_mysql_db(conn, 'clickhouse')
create_mysql_db(mysql_conn, 'clickhouse')
print "mysql database created"
postgres_conn = get_postgres_conn()
print "postgres connection received"
create_postgres_db(postgres_conn, 'clickhouse')
print "postgres db created"
cursor = postgres_conn.cursor()
cursor.execute("create table if not exists clickhouse.test_table (column1 int primary key, column2 varchar(40) not null)")
yield cluster
except Exception as ex:
print(ex)
raise ex
finally:
cluster.shutdown()
@ -141,3 +168,11 @@ def test_sqlite_odbc_cached_dictionary(started_cluster):
time.sleep(5)
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))") == "12\n"
def test_postgres_odbc_hached_dictionary_with_schema(started_cluster):
conn = get_postgres_conn()
cursor = conn.cursor()
cursor.execute("insert into clickhouse.test_table values(1, 'hello'),(2, 'world')")
time.sleep(5)
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))") == "hello\n"
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))") == "world\n"

View File

@ -0,0 +1,17 @@
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.Issue_2231_Invalid_Nested_Columns_Size;
CREATE TABLE test.Issue_2231_Invalid_Nested_Columns_Size (
Date Date,
NestedColumn Nested(
ID Int32,
Count Int64
)
) Engine = MergeTree
PARTITION BY tuple()
ORDER BY Date;
INSERT INTO test.Issue_2231_Invalid_Nested_Columns_Size VALUES (today(), [2,2], [1]), (today(), [2,2], [1, 1]); -- { serverError 190 }
SELECT * FROM test.Issue_2231_Invalid_Nested_Columns_Size;
DROP TABLE test.Issue_2231_Invalid_Nested_Columns_Size;

View File

@ -0,0 +1,11 @@
1.10
2.1000
3.100000000000
1.20
2.2000
3.200000000000
1.30
2.3000
3.300000000000
1 1.000000000000000000 10.000000000000000000
1 1.000000000000000000 10.000000000000000000

View File

@ -0,0 +1,26 @@
select toDecimal32(1.1, 2) as x group by x;
select toDecimal64(2.1, 4) as x group by x;
select toDecimal128(3.1, 12) as x group by x;
select materialize(toDecimal32(1.2, 2)) as x group by x;
select materialize(toDecimal64(2.2, 4)) as x group by x;
select materialize(toDecimal128(3.2, 12)) as x group by x;
select x from (select toDecimal32(1.3, 2) x) group by x;
select x from (select toDecimal64(2.3, 4) x) group by x;
select x from (select toDecimal128(3.3, 12) x) group by x;
DROP TABLE IF EXISTS test.decimal;
CREATE TABLE IF NOT EXISTS test.decimal
(
A UInt64,
B Decimal128(18),
C Decimal128(18)
) Engine = Memory;
INSERT INTO test.decimal VALUES (1,1,1), (1,1,2), (1,1,3), (1,1,4);
SELECT A, toString(B) AS B_str, toString(SUM(C)) AS c_str FROM test.decimal GROUP BY A, B_str;
SELECT A, B_str, toString(cc) FROM (SELECT A, toString(B) AS B_str, SUM(C) AS cc FROM test.decimal GROUP BY A, B_str);
DROP TABLE test.decimal;

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS test.sites;
CREATE TABLE test.sites (Domain UInt8, `Users.UserID` Array(UInt64), `Users.Dates` Array(Array(Date))) ENGINE = MergeTree ORDER BY Domain SETTINGS vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0;
SYSTEM STOP MERGES;
INSERT INTO test.sites VALUES (1,[1],[[]]);
INSERT INTO test.sites VALUES (2,[1],[['2018-06-22']]);
SELECT count(), countArray(Users.Dates), countArrayArray(Users.Dates) FROM test.sites;
SYSTEM START MERGES;
OPTIMIZE TABLE test.sites FINAL;
SELECT count(), countArray(Users.Dates), countArrayArray(Users.Dates) FROM test.sites;
DROP TABLE test.sites;

View File

@ -0,0 +1,4 @@
1 foo
1 foo
1 \N
\N

View File

@ -0,0 +1,12 @@
create temporary table wups (a Array(Nullable(String)));
select count(), a[1] from wups group by a[1];
insert into wups (a) values(['foo']);
select count(), a[1] from wups group by a[1];
insert into wups (a) values([]);
select count(), a[1] from wups group by a[1] order by a[1];
drop temporary table wups;
create temporary table wups (a Array(Nullable(String)));
insert into wups (a) values([]);
select a[1] from wups;

View File

@ -0,0 +1,35 @@
DROP TABLE IF EXISTS test.perf;
CREATE TABLE test.perf (site String, user_id UInt64, z Float64) ENGINE = Log;
SELECT * FROM (SELECT perf_1.z AS z_1 FROM test.perf AS perf_1);
SELECT sum(mul)/sqrt(sum(sqr_dif_1) * sum(sqr_dif_2)) AS z_r
FROM(
SELECT
(SELECT avg(z_1) AS z_1_avg,
avg(z_2) AS z_2_avg
FROM (
SELECT perf_1.site, perf_1.z AS z_1
FROM test.perf AS perf_1
WHERE user_id = 000
) ALL INNER JOIN (
SELECT perf_2.site, perf_2.z AS z_2
FROM test.perf AS perf_2
WHERE user_id = 999
) USING site) as avg_values,
z_1 - avg_values.1 AS dif_1,
z_2 - avg_values.2 AS dif_2,
dif_1 * dif_2 AS mul,
dif_1*dif_1 AS sqr_dif_1,
dif_2*dif_2 AS sqr_dif_2
FROM (
SELECT perf_1.site, perf_1.z AS z_1
FROM test.perf AS perf_1
WHERE user_id = 000
) ALL INNER JOIN (
SELECT perf_2.site, perf_2.z AS z_2
FROM test.perf AS perf_2
WHERE user_id = 999
) USING site);
DROP TABLE test.perf;

View File

@ -0,0 +1 @@
CREATE DATABASE IF NOT EXISTS test; -- foo

View File

@ -0,0 +1,3 @@
SET send_logs_level = 'none';
SET join_default_strictness = '';
SELECT * FROM system.one INNER JOIN (SELECT number AS k FROM system.numbers) ON dummy = k; -- { serverError 417 }

View File

@ -0,0 +1,2 @@
0
http://reddit.com/r/cpp/comments/xyz

View File

@ -0,0 +1,65 @@
USE test;
DROP TABLE IF EXISTS installation_stats;
CREATE TABLE installation_stats (message String, info String, message_type String) ENGINE = Log;
SELECT count(*) AS total
FROM
(
SELECT
message,
info,
count() AS cnt
FROM installation_stats
WHERE message_type LIKE 'fail'
GROUP BY
message,
info
ORDER BY cnt DESC
LIMIT 5 BY message
);
DROP TABLE installation_stats;
CREATE TEMPORARY TABLE Accounts (AccountID UInt64, Currency String);
SELECT AccountID
FROM
(
SELECT
AccountID,
Currency
FROM Accounts
LIMIT 2 BY Currency
);
CREATE TEMPORARY TABLE commententry1 (created_date Date, link_id String, subreddit String);
INSERT INTO commententry1 VALUES ('2016-01-01', 'xyz', 'cpp');
SELECT concat('http://reddit.com/r/', subreddit, '/comments/', replaceRegexpOne(link_id, 't[0-9]_', ''))
FROM
(
SELECT
y,
subreddit,
link_id,
cnt
FROM
(
SELECT
created_date AS y,
link_id,
subreddit,
count(*) AS cnt
FROM commententry1
WHERE toYear(created_date) = 2016
GROUP BY
y,
link_id,
subreddit
ORDER BY y ASC
)
ORDER BY
y ASC,
cnt DESC
LIMIT 1 BY y
);

View File

@ -0,0 +1,2 @@
1
1 1

View File

@ -0,0 +1,36 @@
CREATE TEMPORARY TABLE test
(
x Int32
);
INSERT INTO test VALUES (1);
SELECT x
FROM
(
SELECT
x,
1
FROM test
ALL INNER JOIN
(
SELECT
count(),
1
FROM test
) USING (1)
LIMIT 10
);
SELECT
x,
1
FROM test
ALL INNER JOIN
(
SELECT
count(),
1
FROM test
) USING (1)
LIMIT 10;

View File

@ -0,0 +1,18 @@
DROP TABLE IF EXISTS test.whoami;
DROP TABLE IF EXISTS test.tellme;
DROP TABLE IF EXISTS test.tellme_nested;
use test;
create view whoami as select 1 as n;
create view tellme as select * from whoami;
create view tellme_nested as select * from (select * from whoami);
select * from tellme;
select * from tellme_nested;
use default;
select * from test.tellme;
select * from test.tellme_nested;
DROP TABLE test.whoami;
DROP TABLE test.tellme;
DROP TABLE test.tellme_nested;

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (18.14.8) unstable; urgency=low
clickhouse (18.14.9) unstable; urgency=low
* Modified source code
-- <root@yandex-team.ru> Sat, 13 Oct 2018 03:31:17 +0300
-- <root@yandex-team.ru> Tue, 16 Oct 2018 15:58:16 +0300

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=18.14.8
ARG version=18.14.9
RUN apt-get update \
&& apt-get install --yes --no-install-recommends \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=18.14.8
ARG version=18.14.9
ARG gosu_ver=1.10
RUN apt-get update \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=18.14.8
ARG version=18.14.9
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \

View File

@ -111,7 +111,7 @@ Example of settings:
```xml
<odbc>
<db>DatabaseName</db>
<table>TableName</table>
<table>ShemaName.TableName</table>
<connection_string>DSN=some_parameters</connection_string>
<invalidate_query>SQL_QUERY</invalidate_query>
</odbc>
@ -120,10 +120,11 @@ Example of settings:
Setting fields:
- `db` Name of the database. Omit it if the database name is set in the `<connection_string>` parameters.
- `table` Name of the table.
- `table` Name of the table and schema if exists.
- `connection_string` Connection string.
- `invalidate_query` Query for checking the dictionary status. Optional parameter. Read more in the section [Updating dictionaries](external_dicts_dict_lifetime.md#dicts-external_dicts_dict_lifetime).
ClickHouse receives quoting symbols from ODBC-driver and quote all settings in queries to driver, so it's necessary to set table name accordingly to table name case in database.
### Known vulnerability of the ODBC dictionary functionality

View File

@ -111,7 +111,7 @@
```xml
<odbc>
<db>DatabaseName</db>
<table>TableName</table>
<table>ShemaName.TableName</table>
<connection_string>DSN=some_parameters</connection_string>
<invalidate_query>SQL_QUERY</invalidate_query>
</odbc>
@ -119,11 +119,13 @@
Поля настройки:
- `db` - имя базы данных. Не указывать, если имя базы задано в параметрах `<connection_string>`.
- `table` - имя таблицы.
- `db` - имя базы данных. Не указывать, если имя базы задано в параметрах. `<connection_string>`.
- `table` - имя таблицы и схемы, если она есть.
- `connection_string` - строка соединения.
- `invalidate_query` - запрос для проверки статуса словаря. Необязательный параметр. Читайте подробнее в разделе [Обновление словарей](external_dicts_dict_lifetime.md#dicts-external_dicts_dict_lifetime).
ClickHouse получает от ODBC-драйвера информацию о квотировании и квотирует настройки в запросах к драйверу, поэтому имя таблицы нужно указывать в соответствии с регистром имени таблицы в базе данных.
### Выявленная уязвимость в функционировании ODBC словарей
!!! attention

View File

@ -7,15 +7,15 @@ nav:
- '性能': 'introduction/performance.md'
- 'Yandex.Metrica使用案例': 'introduction/ya_metrika_task.md'
- '起步':
- '入门指南':
- '部署运行': 'getting_started/index.md'
- '示例数据集':
- 'OnTime': 'getting_started/example_datasets/ontime.md'
- 'New York Taxi data': 'getting_started/example_datasets/nyc_taxi.md'
- 'AMPLab Big Data Benchmark': 'getting_started/example_datasets/amplab_benchmark.md'
- 'WikiStat': 'getting_started/example_datasets/wikistat.md'
- 'Terabyte click logs from Criteo': 'getting_started/example_datasets/criteo.md'
- 'Star Schema Benchmark': 'getting_started/example_datasets/star_schema.md'
- '航班飞行数据': 'getting_started/example_datasets/ontime.md'
- '纽约市出租车数据': 'getting_started/example_datasets/nyc_taxi.md'
- 'AMPLab大数据基准测试': 'getting_started/example_datasets/amplab_benchmark.md'
- '维基访问数据': 'getting_started/example_datasets/wikistat.md'
- 'Criteo TB级别点击日志': 'getting_started/example_datasets/criteo.md'
- 'Star Schema基准测试': 'getting_started/example_datasets/star_schema.md'
- '客户端':
- '介绍': 'interfaces/index.md'

View File

@ -1 +0,0 @@
../../../en/getting_started/example_datasets/amplab_benchmark.md

View File

@ -0,0 +1,123 @@
# AMPLab 大数据基准测试
参考 <https://amplab.cs.berkeley.edu/benchmark/>
需要您在<https://aws.amazon.com>注册一个免费的账号。注册时需要您提供信用卡、邮箱、电话等信息。之后可以在<https://console.aws.amazon.com/iam/home?nc2=h_m_sc#security_credential>获取新的访问密钥
在控制台运行以下命令:
```bash
sudo apt-get install s3cmd
mkdir tiny; cd tiny;
s3cmd sync s3://big-data-benchmark/pavlo/text-deflate/tiny/ .
cd ..
mkdir 1node; cd 1node;
s3cmd sync s3://big-data-benchmark/pavlo/text-deflate/1node/ .
cd ..
mkdir 5nodes; cd 5nodes;
s3cmd sync s3://big-data-benchmark/pavlo/text-deflate/5nodes/ .
cd ..
```
在ClickHouse运行如下查询
``` sql
CREATE TABLE rankings_tiny
(
pageURL String,
pageRank UInt32,
avgDuration UInt32
) ENGINE = Log;
CREATE TABLE uservisits_tiny
(
sourceIP String,
destinationURL String,
visitDate Date,
adRevenue Float32,
UserAgent String,
cCode FixedString(3),
lCode FixedString(6),
searchWord String,
duration UInt32
) ENGINE = MergeTree(visitDate, visitDate, 8192);
CREATE TABLE rankings_1node
(
pageURL String,
pageRank UInt32,
avgDuration UInt32
) ENGINE = Log;
CREATE TABLE uservisits_1node
(
sourceIP String,
destinationURL String,
visitDate Date,
adRevenue Float32,
UserAgent String,
cCode FixedString(3),
lCode FixedString(6),
searchWord String,
duration UInt32
) ENGINE = MergeTree(visitDate, visitDate, 8192);
CREATE TABLE rankings_5nodes_on_single
(
pageURL String,
pageRank UInt32,
avgDuration UInt32
) ENGINE = Log;
CREATE TABLE uservisits_5nodes_on_single
(
sourceIP String,
destinationURL String,
visitDate Date,
adRevenue Float32,
UserAgent String,
cCode FixedString(3),
lCode FixedString(6),
searchWord String,
duration UInt32
) ENGINE = MergeTree(visitDate, visitDate, 8192);
```
回到控制台运行如下命令:
```bash
for i in tiny/rankings/*.deflate; do echo $i; zlib-flate -uncompress < $i | clickhouse-client --host=example-perftest01j --query="INSERT INTO rankings_tiny FORMAT CSV"; done
for i in tiny/uservisits/*.deflate; do echo $i; zlib-flate -uncompress < $i | clickhouse-client --host=example-perftest01j --query="INSERT INTO uservisits_tiny FORMAT CSV"; done
for i in 1node/rankings/*.deflate; do echo $i; zlib-flate -uncompress < $i | clickhouse-client --host=example-perftest01j --query="INSERT INTO rankings_1node FORMAT CSV"; done
for i in 1node/uservisits/*.deflate; do echo $i; zlib-flate -uncompress < $i | clickhouse-client --host=example-perftest01j --query="INSERT INTO uservisits_1node FORMAT CSV"; done
for i in 5nodes/rankings/*.deflate; do echo $i; zlib-flate -uncompress < $i | clickhouse-client --host=example-perftest01j --query="INSERT INTO rankings_5nodes_on_single FORMAT CSV"; done
for i in 5nodes/uservisits/*.deflate; do echo $i; zlib-flate -uncompress < $i | clickhouse-client --host=example-perftest01j --query="INSERT INTO uservisits_5nodes_on_single FORMAT CSV"; done
```
简单的查询示例:
``` sql
SELECT pageURL, pageRank FROM rankings_1node WHERE pageRank > 1000
SELECT substring(sourceIP, 1, 8), sum(adRevenue) FROM uservisits_1node GROUP BY substring(sourceIP, 1, 8)
SELECT
sourceIP,
sum(adRevenue) AS totalRevenue,
avg(pageRank) AS pageRank
FROM rankings_1node ALL INNER JOIN
(
SELECT
sourceIP,
destinationURL AS pageURL,
adRevenue
FROM uservisits_1node
WHERE (visitDate > '1980-01-01') AND (visitDate < '1980-04-01')
) USING pageURL
GROUP BY sourceIP
ORDER BY totalRevenue DESC
LIMIT 1
```
[Original article](https://clickhouse.yandex/docs/en/getting_started/example_datasets/amplab_benchmark/) <!--hide-->

View File

@ -1 +0,0 @@
../../../en/getting_started/example_datasets/criteo.md

View File

@ -0,0 +1,75 @@
# Criteo TB级别点击日志
可以从<http://labs.criteo.com/downloads/download-terabyte-click-logs/>上下载数据
创建原始数据对应的表结构:
``` sql
CREATE TABLE criteo_log (date Date, clicked UInt8, int1 Int32, int2 Int32, int3 Int32, int4 Int32, int5 Int32, int6 Int32, int7 Int32, int8 Int32, int9 Int32, int10 Int32, int11 Int32, int12 Int32, int13 Int32, cat1 String, cat2 String, cat3 String, cat4 String, cat5 String, cat6 String, cat7 String, cat8 String, cat9 String, cat10 String, cat11 String, cat12 String, cat13 String, cat14 String, cat15 String, cat16 String, cat17 String, cat18 String, cat19 String, cat20 String, cat21 String, cat22 String, cat23 String, cat24 String, cat25 String, cat26 String) ENGINE = Log
```
下载数据:
```bash
for i in {00..23}; do echo $i; zcat datasets/criteo/day_${i#0}.gz | sed -r 's/^/2000-01-'${i/00/24}'\t/' | clickhouse-client --host=example-perftest01j --query="INSERT INTO criteo_log FORMAT TabSeparated"; done
```
创建转换后的数据对应的表结构:
``` sql
CREATE TABLE criteo
(
date Date,
clicked UInt8,
int1 Int32,
int2 Int32,
int3 Int32,
int4 Int32,
int5 Int32,
int6 Int32,
int7 Int32,
int8 Int32,
int9 Int32,
int10 Int32,
int11 Int32,
int12 Int32,
int13 Int32,
icat1 UInt32,
icat2 UInt32,
icat3 UInt32,
icat4 UInt32,
icat5 UInt32,
icat6 UInt32,
icat7 UInt32,
icat8 UInt32,
icat9 UInt32,
icat10 UInt32,
icat11 UInt32,
icat12 UInt32,
icat13 UInt32,
icat14 UInt32,
icat15 UInt32,
icat16 UInt32,
icat17 UInt32,
icat18 UInt32,
icat19 UInt32,
icat20 UInt32,
icat21 UInt32,
icat22 UInt32,
icat23 UInt32,
icat24 UInt32,
icat25 UInt32,
icat26 UInt32
) ENGINE = MergeTree(date, intHash32(icat1), (date, intHash32(icat1)), 8192)
```
将第一张表中的原始数据转化写入到第二张表中去:
``` sql
INSERT INTO criteo SELECT date, clicked, int1, int2, int3, int4, int5, int6, int7, int8, int9, int10, int11, int12, int13, reinterpretAsUInt32(unhex(cat1)) AS icat1, reinterpretAsUInt32(unhex(cat2)) AS icat2, reinterpretAsUInt32(unhex(cat3)) AS icat3, reinterpretAsUInt32(unhex(cat4)) AS icat4, reinterpretAsUInt32(unhex(cat5)) AS icat5, reinterpretAsUInt32(unhex(cat6)) AS icat6, reinterpretAsUInt32(unhex(cat7)) AS icat7, reinterpretAsUInt32(unhex(cat8)) AS icat8, reinterpretAsUInt32(unhex(cat9)) AS icat9, reinterpretAsUInt32(unhex(cat10)) AS icat10, reinterpretAsUInt32(unhex(cat11)) AS icat11, reinterpretAsUInt32(unhex(cat12)) AS icat12, reinterpretAsUInt32(unhex(cat13)) AS icat13, reinterpretAsUInt32(unhex(cat14)) AS icat14, reinterpretAsUInt32(unhex(cat15)) AS icat15, reinterpretAsUInt32(unhex(cat16)) AS icat16, reinterpretAsUInt32(unhex(cat17)) AS icat17, reinterpretAsUInt32(unhex(cat18)) AS icat18, reinterpretAsUInt32(unhex(cat19)) AS icat19, reinterpretAsUInt32(unhex(cat20)) AS icat20, reinterpretAsUInt32(unhex(cat21)) AS icat21, reinterpretAsUInt32(unhex(cat22)) AS icat22, reinterpretAsUInt32(unhex(cat23)) AS icat23, reinterpretAsUInt32(unhex(cat24)) AS icat24, reinterpretAsUInt32(unhex(cat25)) AS icat25, reinterpretAsUInt32(unhex(cat26)) AS icat26 FROM criteo_log;
DROP TABLE criteo_log;
```
[Original article](https://clickhouse.yandex/docs/en/getting_started/example_datasets/criteo/) <!--hide-->

View File

@ -1 +0,0 @@
../../../en/getting_started/example_datasets/nyc_taxi.md

File diff suppressed because one or more lines are too long

View File

@ -1 +0,0 @@
../../../en/getting_started/example_datasets/ontime.md

View File

@ -0,0 +1,318 @@
<a name="example_datasets-ontime"></a>
# 航班飞行数据
下载数据:
```bash
for s in `seq 1987 2017`
do
for m in `seq 1 12`
do
wget http://transtats.bts.gov/PREZIP/On_Time_On_Time_Performance_${s}_${m}.zip
done
done
```
(引用 <https://github.com/Percona-Lab/ontime-airline-performance/blob/master/download.sh> )
创建表结构:
```sql
CREATE TABLE `ontime` (
`Year` UInt16,
`Quarter` UInt8,
`Month` UInt8,
`DayofMonth` UInt8,
`DayOfWeek` UInt8,
`FlightDate` Date,
`UniqueCarrier` FixedString(7),
`AirlineID` Int32,
`Carrier` FixedString(2),
`TailNum` String,
`FlightNum` String,
`OriginAirportID` Int32,
`OriginAirportSeqID` Int32,
`OriginCityMarketID` Int32,
`Origin` FixedString(5),
`OriginCityName` String,
`OriginState` FixedString(2),
`OriginStateFips` String,
`OriginStateName` String,
`OriginWac` Int32,
`DestAirportID` Int32,
`DestAirportSeqID` Int32,
`DestCityMarketID` Int32,
`Dest` FixedString(5),
`DestCityName` String,
`DestState` FixedString(2),
`DestStateFips` String,
`DestStateName` String,
`DestWac` Int32,
`CRSDepTime` Int32,
`DepTime` Int32,
`DepDelay` Int32,
`DepDelayMinutes` Int32,
`DepDel15` Int32,
`DepartureDelayGroups` String,
`DepTimeBlk` String,
`TaxiOut` Int32,
`WheelsOff` Int32,
`WheelsOn` Int32,
`TaxiIn` Int32,
`CRSArrTime` Int32,
`ArrTime` Int32,
`ArrDelay` Int32,
`ArrDelayMinutes` Int32,
`ArrDel15` Int32,
`ArrivalDelayGroups` Int32,
`ArrTimeBlk` String,
`Cancelled` UInt8,
`CancellationCode` FixedString(1),
`Diverted` UInt8,
`CRSElapsedTime` Int32,
`ActualElapsedTime` Int32,
`AirTime` Int32,
`Flights` Int32,
`Distance` Int32,
`DistanceGroup` UInt8,
`CarrierDelay` Int32,
`WeatherDelay` Int32,
`NASDelay` Int32,
`SecurityDelay` Int32,
`LateAircraftDelay` Int32,
`FirstDepTime` String,
`TotalAddGTime` String,
`LongestAddGTime` String,
`DivAirportLandings` String,
`DivReachedDest` String,
`DivActualElapsedTime` String,
`DivArrDelay` String,
`DivDistance` String,
`Div1Airport` String,
`Div1AirportID` Int32,
`Div1AirportSeqID` Int32,
`Div1WheelsOn` String,
`Div1TotalGTime` String,
`Div1LongestGTime` String,
`Div1WheelsOff` String,
`Div1TailNum` String,
`Div2Airport` String,
`Div2AirportID` Int32,
`Div2AirportSeqID` Int32,
`Div2WheelsOn` String,
`Div2TotalGTime` String,
`Div2LongestGTime` String,
`Div2WheelsOff` String,
`Div2TailNum` String,
`Div3Airport` String,
`Div3AirportID` Int32,
`Div3AirportSeqID` Int32,
`Div3WheelsOn` String,
`Div3TotalGTime` String,
`Div3LongestGTime` String,
`Div3WheelsOff` String,
`Div3TailNum` String,
`Div4Airport` String,
`Div4AirportID` Int32,
`Div4AirportSeqID` Int32,
`Div4WheelsOn` String,
`Div4TotalGTime` String,
`Div4LongestGTime` String,
`Div4WheelsOff` String,
`Div4TailNum` String,
`Div5Airport` String,
`Div5AirportID` Int32,
`Div5AirportSeqID` Int32,
`Div5WheelsOn` String,
`Div5TotalGTime` String,
`Div5LongestGTime` String,
`Div5WheelsOff` String,
`Div5TailNum` String
) ENGINE = MergeTree(FlightDate, (Year, FlightDate), 8192)
```
加载数据:
```bash
for i in *.zip; do echo $i; unzip -cq $i '*.csv' | sed 's/\.00//g' | clickhouse-client --host=example-perftest01j --query="INSERT INTO ontime FORMAT CSVWithNames"; done
```
查询:
Q0.
```sql
select avg(c1) from (select Year, Month, count(*) as c1 from ontime group by Year, Month);
```
Q1. 查询从2000年到2008年每天的航班数
```sql
SELECT DayOfWeek, count(*) AS c FROM ontime WHERE Year >= 2000 AND Year <= 2008 GROUP BY DayOfWeek ORDER BY c DESC;
```
Q2. 查询从2000年到2008年每周延误超过10分钟的航班数。
```sql
SELECT DayOfWeek, count(*) AS c FROM ontime WHERE DepDelay>10 AND Year >= 2000 AND Year <= 2008 GROUP BY DayOfWeek ORDER BY c DESC
```
Q3. 查询2000年到2008年每个机场延误超过10分钟以上的次数
```sql
SELECT Origin, count(*) AS c FROM ontime WHERE DepDelay>10 AND Year >= 2000 AND Year <= 2008 GROUP BY Origin ORDER BY c DESC LIMIT 10
```
Q4. 查询2007年各航空公司延误超过10分钟以上的次数
```sql
SELECT Carrier, count(*) FROM ontime WHERE DepDelay>10 AND Year = 2007 GROUP BY Carrier ORDER BY count(*) DESC
```
Q5. 查询2007年各航空公司延误超过10分钟以上的百分比
```sql
SELECT Carrier, c, c2, c*1000/c2 as c3
FROM
(
SELECT
Carrier,
count(*) AS c
FROM ontime
WHERE DepDelay>10
AND Year=2007
GROUP BY Carrier
)
ANY INNER JOIN
(
SELECT
Carrier,
count(*) AS c2
FROM ontime
WHERE Year=2007
GROUP BY Carrier
) USING Carrier
ORDER BY c3 DESC;
```
更好的查询版本:
```sql
SELECT Carrier, avg(DepDelay > 10) * 1000 AS c3 FROM ontime WHERE Year = 2007 GROUP BY Carrier ORDER BY Carrier
```
Q6. 同上一个查询一致,只是查询范围扩大到2000年到2008年
```sql
SELECT Carrier, c, c2, c*1000/c2 as c3
FROM
(
SELECT
Carrier,
count(*) AS c
FROM ontime
WHERE DepDelay>10
AND Year >= 2000 AND Year <= 2008
GROUP BY Carrier
)
ANY INNER JOIN
(
SELECT
Carrier,
count(*) AS c2
FROM ontime
WHERE Year >= 2000 AND Year <= 2008
GROUP BY Carrier
) USING Carrier
ORDER BY c3 DESC;
```
更好的查询版本:
```sql
SELECT Carrier, avg(DepDelay > 10) * 1000 AS c3 FROM ontime WHERE Year >= 2000 AND Year <= 2008 GROUP BY Carrier ORDER BY Carrier
```
Q7. 每年航班延误超过10分钟的百分比
```sql
SELECT Year, c1/c2
FROM
(
select
Year,
count(*)*1000 as c1
from ontime
WHERE DepDelay>10
GROUP BY Year
)
ANY INNER JOIN
(
select
Year,
count(*) as c2
from ontime
GROUP BY Year
) USING (Year)
ORDER BY Year
```
更好的查询版本:
```sql
SELECT Year, avg(DepDelay > 10) FROM ontime GROUP BY Year ORDER BY Year
```
Q8. 每年更受人们喜爱的目的地
```sql
SELECT DestCityName, uniqExact(OriginCityName) AS u FROM ontime WHERE Year >= 2000 and Year <= 2010 GROUP BY DestCityName ORDER BY u DESC LIMIT 10;
```
Q9.
```sql
select Year, count(*) as c1 from ontime group by Year;
```
Q10.
```sql
select
min(Year), max(Year), Carrier, count(*) as cnt,
sum(ArrDelayMinutes>30) as flights_delayed,
round(sum(ArrDelayMinutes>30)/count(*),2) as rate
FROM ontime
WHERE
DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI')
and DestState not in ('AK', 'HI', 'PR', 'VI')
and FlightDate < '2010-01-01'
GROUP by Carrier
HAVING cnt > 100000 and max(Year) > 1990
ORDER by rate DESC
LIMIT 1000;
```
Bonus:
```sql
SELECT avg(cnt) FROM (SELECT Year,Month,count(*) AS cnt FROM ontime WHERE DepDel15=1 GROUP BY Year,Month)
select avg(c1) from (select Year,Month,count(*) as c1 from ontime group by Year,Month)
SELECT DestCityName, uniqExact(OriginCityName) AS u FROM ontime GROUP BY DestCityName ORDER BY u DESC LIMIT 10;
SELECT OriginCityName, DestCityName, count() AS c FROM ontime GROUP BY OriginCityName, DestCityName ORDER BY c DESC LIMIT 10;
SELECT OriginCityName, count() AS c FROM ontime GROUP BY OriginCityName ORDER BY c DESC LIMIT 10;
```
这个性能测试由Vadim Tkachenko提供。参考
- <https://www.percona.com/blog/2009/10/02/analyzing-air-traffic-performance-with-infobright-and-monetdb/>
- <https://www.percona.com/blog/2009/10/26/air-traffic-queries-in-luciddb/>
- <https://www.percona.com/blog/2009/11/02/air-traffic-queries-in-infinidb-early-alpha/>
- <https://www.percona.com/blog/2014/04/21/using-apache-hadoop-and-impala-together-with-mysql-for-data-analysis/>
- <https://www.percona.com/blog/2016/01/07/apache-spark-with-air-ontime-performance-data/>
- <http://nickmakos.blogspot.ru/2012/08/analyzing-air-traffic-performance-with.html>

View File

@ -1 +0,0 @@
../../../en/getting_started/example_datasets/star_schema.md

View File

@ -0,0 +1,87 @@
# Star Schema 基准测试
编译 dbgen: <https://github.com/vadimtk/ssb-dbgen>
```bash
git clone git@github.com:vadimtk/ssb-dbgen.git
cd ssb-dbgen
make
```
在编译过程中可能会有一些警告,这是正常的。
将`dbgen`和`dists.dss`放在一个可用容量大于800GB的磁盘中。
开始生成数据:
```bash
./dbgen -s 1000 -T c
./dbgen -s 1000 -T l
```
在ClickHouse中创建表结构
``` sql
CREATE TABLE lineorder (
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY String,
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE String
)Engine=MergeTree(LO_ORDERDATE,(LO_ORDERKEY,LO_LINENUMBER,LO_ORDERDATE),8192);
CREATE TABLE customer (
C_CUSTKEY UInt32,
C_NAME String,
C_ADDRESS String,
C_CITY String,
C_NATION String,
C_REGION String,
C_PHONE String,
C_MKTSEGMENT String,
C_FAKEDATE Date
)Engine=MergeTree(C_FAKEDATE,(C_CUSTKEY,C_FAKEDATE),8192);
CREATE TABLE part (
P_PARTKEY UInt32,
P_NAME String,
P_MFGR String,
P_CATEGORY String,
P_BRAND String,
P_COLOR String,
P_TYPE String,
P_SIZE UInt8,
P_CONTAINER String,
P_FAKEDATE Date
)Engine=MergeTree(P_FAKEDATE,(P_PARTKEY,P_FAKEDATE),8192);
CREATE TABLE lineorderd AS lineorder ENGINE = Distributed(perftest_3shards_1replicas, default, lineorder, rand());
CREATE TABLE customerd AS customer ENGINE = Distributed(perftest_3shards_1replicas, default, customer, rand());
CREATE TABLE partd AS part ENGINE = Distributed(perftest_3shards_1replicas, default, part, rand());
```
如果是在单节点中进行的测试那么只需要创建对应的MergeTree表。
如果是在多节点中进行的测试,您需要在配置文件中配置`perftest_3shards_1replicas`集群的信息。
然后在每个节点中同时创建MergeTree表和Distributed表。
下载数据(如果您是分布式测试的话将'customer'更改为'customerd'
```bash
cat customer.tbl | sed 's/$/2000-01-01/' | clickhouse-client --query "INSERT INTO customer FORMAT CSV"
cat lineorder.tbl | clickhouse-client --query "INSERT INTO lineorder FORMAT CSV"
```
[Original article](https://clickhouse.yandex/docs/en/getting_started/example_datasets/star_schema/) <!--hide-->

View File

@ -1 +0,0 @@
../../../en/getting_started/example_datasets/wikistat.md

View File

@ -0,0 +1,29 @@
# 维基访问数据
参考: <http://dumps.wikimedia.org/other/pagecounts-raw/>
创建表结构:
``` sql
CREATE TABLE wikistat
(
date Date,
time DateTime,
project String,
subproject String,
path String,
hits UInt64,
size UInt64
) ENGINE = MergeTree(date, (path, time), 8192);
```
加载数据:
```bash
for i in {2007..2016}; do for j in {01..12}; do echo $i-$j >&2; curl -sSL "http://dumps.wikimedia.org/other/pagecounts-raw/$i/$i-$j/" | grep -oE 'pagecounts-[0-9]+-[0-9]+\.gz'; done; done | sort | uniq | tee links.txt
cat links.txt | while read link; do wget http://dumps.wikimedia.org/other/pagecounts-raw/$(echo $link | sed -r 's/pagecounts-([0-9]{4})([0-9]{2})[0-9]{2}-[0-9]+\.gz/\1/')/$(echo $link | sed -r 's/pagecounts-([0-9]{4})([0-9]{2})[0-9]{2}-[0-9]+\.gz/\1-\2/')/$link; done
ls -1 /opt/wikistat/ | grep gz | while read i; do echo $i; gzip -cd /opt/wikistat/$i | ./wikistat-loader --time="$(echo -n $i | sed -r 's/pagecounts-([0-9]{4})([0-9]{2})([0-9]{2})-([0-9]{2})([0-9]{2})([0-9]{2})\.gz/\1-\2-\3 \4-00-00/')" | clickhouse-client --query="INSERT INTO wikistat FORMAT TabSeparated"; done
```
[Original article](https://clickhouse.yandex/docs/en/getting_started/example_datasets/wikistat/) <!--hide-->

View File

@ -1 +0,0 @@
../../en/getting_started/index.md

View File

@ -0,0 +1,141 @@
# 入门指南
## 系统要求
如果从官方仓库安装需要确保您使用的是x86_64处理器构架的Linux并且支持SSE 4.2指令集
检查是否支持SSE 4.2
```bash
grep -q sse4_2 /proc/cpuinfo && echo "SSE 4.2 supported" || echo "SSE 4.2 not supported"
```
我们推荐使用Ubuntu或者Debian。终端必须使用UTF-8编码。
基于rpm的系统,你可以使用第三方的安装包https://packagecloud.io/altinity/clickhouse 或者直接安装debian安装包。
ClickHouse还可以在FreeBSD与Mac OS X上工作。同时它可以在不支持SSE 4.2的x86_64构架和AArch64 CPUs上编译。
## 安装
为了测试和开发系统可以安装在单个服务器或普通PC机上。
### 为Debian/Ubuntu安装
在`/etc/apt/sources.list` (或创建`/etc/apt/sources.list.d/clickhouse.list`文件)中添加仓库:
```text
deb http://repo.yandex.ru/clickhouse/deb/stable/ main/
```
如果你想使用最新的测试版本,请使用'testing'替换'stable'。
然后运行:
```bash
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 # optional
sudo apt-get update
sudo apt-get install clickhouse-client clickhouse-server
```
你也可以从这里手动下载安装包:<https://repo.yandex.ru/clickhouse/deb/stable/main/>
ClickHouse包含访问控制配置它们位于`users.xml`文件中(与'config.xml'同目录)。
默认情况下允许从任何地方使用默认的default用户无密码的访问ClickHouse。参考user/default/networks
有关更多信息,请参考"Configuration files"部分。
### 使用源码安装
具体编译方式可以参考build.md。
你可以编译并安装它们。
你也可以直接使用而不进行安装。
```text
Client: dbms/programs/clickhouse-client
Server: dbms/programs/clickhouse-server
```
在服务器中为数据创建如下目录:
```text
/opt/clickhouse/data/default/
/opt/clickhouse/metadata/default/
```
(它们可以在server config中配置。)
为需要的用户运行chown
日志的路径可以在server config (src/dbms/programs/server/config.xml)中配置。
### 其他的安装方法
Docker image<https://hub.docker.com/r/yandex/clickhouse-server/>
CentOS或RHEL安装包<https://github.com/Altinity/clickhouse-rpm-install>
Gentoo`emerge clickhouse`
## 启动
可以运行如下命令在后台启动服务:
```bash
sudo service clickhouse-server start
```
可以在`/var/log/clickhouse-server/`目录中查看日志。
如果服务没有启动,请检查配置文件 `/etc/clickhouse-server/config.xml`
你也可以在控制台中直接启动服务:
```bash
clickhouse-server --config-file=/etc/clickhouse-server/config.xml
```
在这种情况下,日志将被打印到控制台中,这在开发过程中很方便。
如果配置文件在当前目录中,你可以不指定‘--config-file参数。它默认使用./config.xml
你可以使用命令行客户端连接到服务:
```bash
clickhouse-client
```
默认情况下它使用default用户无密码的与localhost:9000服务建立连接。
客户端也可以用于连接远程服务,例如:
```bash
clickhouse-client --host=example.com
```
有关更多信息,请参考"Command-line client"部分。
检查系统是否工作:
```bash
milovidov@hostname:~/work/metrica/src/dbms/src/Client$ ./clickhouse-client
ClickHouse client version 0.0.18749.
Connecting to localhost:9000.
Connected to ClickHouse server version 0.0.18749.
:) SELECT 1
SELECT 1
┌─1─┐
│ 1 │
└───┘
1 rows in set. Elapsed: 0.003 sec.
:)
```
**恭喜,系统已经工作了!**
为了继续进行实验,你可以尝试下载测试数据集。
[Original article](https://clickhouse.yandex/docs/en/getting_started/) <!--hide-->