dbms: external dictionaries: store null_values in sample_block, use them instead of IColumn::insertDefault for MySQL and MongoDB [#METR-17854]

This commit is contained in:
Andrey Mironov 2015-10-13 18:38:08 +03:00
parent c469b726c1
commit 4355b32890
4 changed files with 43 additions and 61 deletions

View File

@ -9,6 +9,8 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <common/singleton.h>
#include <memory>
#include <DB/Core/FieldVisitors.h>
namespace DB
{
@ -19,25 +21,21 @@ namespace
Block createSampleBlock(const DictionaryStructure & dict_struct)
{
Block block{
ColumnWithTypeAndName{
new ColumnUInt64,
new DataTypeUInt64,
dict_struct.id.name
}
ColumnWithTypeAndName{new ColumnUInt64{1}, new DataTypeUInt64, dict_struct.id.name}
};
if (dict_struct.range_min)
for (const auto & attribute : { dict_struct.range_min, dict_struct.range_max })
block.insert(ColumnWithTypeAndName{
new ColumnUInt16,
new DataTypeDate,
attribute->name
});
block.insert(
ColumnWithTypeAndName{new ColumnUInt16{1}, new DataTypeDate, attribute->name});
for (const auto & attribute : dict_struct.attributes)
block.insert(ColumnWithTypeAndName{
attribute.type->createColumn(), attribute.type, attribute.name
});
{
auto column = attribute.type->createColumn();
column->insert(attribute.null_value);
block.insert(ColumnWithTypeAndName{column, attribute.type, attribute.name});
}
return block;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/Core/Block.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
@ -11,6 +12,8 @@
#include <mongo/client/dbclient.h>
#include <vector>
#include <string>
#include <DB/Core/FieldVisitors.h>
namespace DB
{
@ -37,17 +40,20 @@ class MongoDBBlockInputStream final : public IProfilingBlockInputStream
public:
MongoDBBlockInputStream(
std::unique_ptr<mongo::DBClientCursor> cursor_, const Block & sample_block, const std::size_t max_block_size)
: cursor{std::move(cursor_)}, sample_block{sample_block}, max_block_size{max_block_size}
std::unique_ptr<mongo::DBClientCursor> cursor_, const Block & sample_block_, const std::size_t max_block_size)
: cursor{std::move(cursor_)}, sample_block{sample_block_}, max_block_size{max_block_size}
{
/// do nothing if cursor has no data
if (!cursor->more())
return;
types.reserve(sample_block.columns());
const auto num_columns = sample_block.columns();
types.reserve(num_columns);
names.reserve(num_columns);
sample_columns.reserve(num_columns);
/// save types of each column to eliminate subsequent typeid_cast<> invocations
for (const auto idx : ext::range(0, sample_block.columns()))
for (const auto idx : ext::range(0, num_columns))
{
const auto & column = sample_block.getByPosition(idx);
const auto type = column.type.get();
@ -85,6 +91,7 @@ public:
};
names.emplace_back(column.name);
sample_columns.emplace_back(column.column.get());
}
}
@ -124,7 +131,7 @@ private:
if (value.ok())
insertValue(columns[idx], types[idx], value);
else
insertDefaultValue(columns[idx], types[idx]);
insertDefaultValue(columns[idx], *sample_columns[idx]);
}
++num_rows;
@ -287,25 +294,9 @@ private:
}
}
/// @todo insert default value from the dictionary attribute definition
static void insertDefaultValue(IColumn * const column, const value_type_t type)
static void insertDefaultValue(IColumn * const column, const IColumn & sample_column)
{
switch (type)
{
case value_type_t::UInt8: static_cast<ColumnUInt8 *>(column)->insertDefault(); break;
case value_type_t::UInt16: static_cast<ColumnUInt16 *>(column)->insertDefault(); break;
case value_type_t::UInt32: static_cast<ColumnUInt32 *>(column)->insertDefault(); break;
case value_type_t::UInt64: static_cast<ColumnUInt64 *>(column)->insertDefault(); break;
case value_type_t::Int8: static_cast<ColumnInt8 *>(column)->insertDefault(); break;
case value_type_t::Int16: static_cast<ColumnInt16 *>(column)->insertDefault(); break;
case value_type_t::Int32: static_cast<ColumnInt32 *>(column)->insertDefault(); break;
case value_type_t::Int64: static_cast<ColumnInt64 *>(column)->insertDefault(); break;
case value_type_t::Float32: static_cast<ColumnFloat32 *>(column)->insertDefault(); break;
case value_type_t::Float64: static_cast<ColumnFloat64 *>(column)->insertDefault(); break;
case value_type_t::String: static_cast<ColumnString *>(column)->insertDefault(); break;
case value_type_t::Date: static_cast<ColumnUInt16 *>(column)->insertDefault(); break;
case value_type_t::DateTime: static_cast<ColumnUInt32 *>(column)->insertDefault(); break;
}
column->insertFrom(sample_column, 0);
}
std::unique_ptr<mongo::DBClientCursor> cursor;
@ -313,6 +304,7 @@ private:
const std::size_t max_block_size;
std::vector<value_type_t> types;
std::vector<mongo::StringData> names;
std::vector<const IColumn *> sample_columns;
};
}

View File

@ -107,6 +107,7 @@ public:
};
}
/// @todo: for MongoDB, modification date can somehow be determined from the `_id` object field
bool isModified() const override { return false; }
DictionarySourcePtr clone() const override { return std::make_unique<MongoDBDictionarySource>(*this); }

View File

@ -37,12 +37,11 @@ class MySQLBlockInputStream final : public IProfilingBlockInputStream
};
public:
MySQLBlockInputStream(const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str,
const Block & sample_block,
MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry, const std::string & query_str, const Block & sample_block_,
const std::size_t max_block_size)
: entry{entry}, query{this->entry->query(query_str)}, result{query.use()},
sample_block{sample_block}, max_block_size{max_block_size}
sample_block{sample_block_}, max_block_size{max_block_size}
{
if (sample_block.columns() != result.getNumFields())
throw Exception{
@ -51,11 +50,15 @@ public:
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH
};
types.reserve(sample_block.columns());
const auto num_columns = sample_block.columns();
types.reserve(num_columns);
sample_columns.reserve(num_columns);
for (const auto idx : ext::range(0, sample_block.columns()))
for (const auto idx : ext::range(0, num_columns))
{
const auto type = sample_block.getByPosition(idx).type.get();
const auto & column = sample_block.getByPosition(idx);
const auto type = column.type.get();
if (typeid_cast<const DataTypeUInt8 *>(type))
types.push_back(value_type_t::UInt8);
else if (typeid_cast<const DataTypeUInt16 *>(type))
@ -87,6 +90,8 @@ public:
"Unsupported type " + type->getName(),
ErrorCodes::UNKNOWN_TYPE
};
sample_columns.emplace_back(column.column.get());
}
}
@ -120,7 +125,7 @@ private:
if (!value.isNull())
insertValue(columns[idx], types[idx], value);
else
insertDefaultValue(columns[idx], types[idx]);
insertDefaultValue(columns[idx], *sample_columns[idx]);
}
++num_rows;
@ -158,24 +163,9 @@ private:
}
}
static void insertDefaultValue(IColumn * const column, const value_type_t type)
static void insertDefaultValue(IColumn * const column, const IColumn & sample_column)
{
switch (type)
{
case value_type_t::UInt8: static_cast<ColumnUInt8 *>(column)->insertDefault(); break;
case value_type_t::UInt16: static_cast<ColumnUInt16 *>(column)->insertDefault(); break;
case value_type_t::UInt32: static_cast<ColumnUInt32 *>(column)->insertDefault(); break;
case value_type_t::UInt64: static_cast<ColumnUInt64 *>(column)->insertDefault(); break;
case value_type_t::Int8: static_cast<ColumnInt8 *>(column)->insertDefault(); break;
case value_type_t::Int16: static_cast<ColumnInt16 *>(column)->insertDefault(); break;
case value_type_t::Int32: static_cast<ColumnInt32 *>(column)->insertDefault(); break;
case value_type_t::Int64: static_cast<ColumnInt64 *>(column)->insertDefault(); break;
case value_type_t::Float32: static_cast<ColumnFloat32 *>(column)->insertDefault(); break;
case value_type_t::Float64: static_cast<ColumnFloat64 *>(column)->insertDefault(); break;
case value_type_t::String: static_cast<ColumnString *>(column)->insertDefault(); break;
case value_type_t::Date: static_cast<ColumnUInt16 *>(column)->insertDefault(); break;
case value_type_t::DateTime: static_cast<ColumnUInt32 *>(column)->insertDefault(); break;
}
column->insertFrom(sample_column, 0);
}
mysqlxx::PoolWithFailover::Entry entry;
@ -184,6 +174,7 @@ private:
Block sample_block;
const std::size_t max_block_size;
std::vector<value_type_t> types;
std::vector<const IColumn *> sample_columns;
};
}