pymongo使用笔记
数据库连接
使用pymongo连接
import pymongo
mongourl = f'mongodb://username:password@localhost:27017'
myclient = pymongo.MongoClient(mongo_url)
db = myclient[database] # 获取数据库
collection = db.get_collection(collection_name) # 获取集合
在shell中连接mongoDB
// 连接本地mongodb
> mongo
// 连接远程mongodb
> mongo 192.168.188.14:27018/admin -u root -p epochn
// 如果无法连接可以尝试加入参数
> mongo --authenticationDatabase 192.168.188.14:27018/admin -u root -p epochn
查看mongoDB状态
在shell中
> db.collection.stats()
在pymongo中
db.command('collstats', collection) # 集合状态
db.command('dbstats') # 数据库状态
常用方法
db.list_collection_names(session=None)
collection.list_indexes() # 查看集合所有索引
collection.index_infomation() # 查看索引信息
CRUD
查询
查询多条数据
res = collection.find(options, extra)
查询单条数据
res = collection.find_one(options, extra)
空值
collection.find({'field': None})
collection.find({'field': ""})
collection.find({'field': {'$exists': False}})
注意,三种查询空值的方式所得到的结果是不同的,所以要么统一空值的表示,要么使用高级查询的unwind
嵌套查询
假设一个拥有嵌套文档结构的集合:
[
{
"name": "col1",
"data": [
{"score": 12, "pos": 1},
{"score": 23, "pos": 32}
]
}
]
查询score大于15的数据
res = collection.find({"data.score": {"$gt": 15}})
数组过滤(elemMatch)
假设一个集合拥有字段
[
{
"name": "col",
"tags": [
{"value": 't1', "id": 13},
{"value": "t2", "id": 11}
]
},
{
"name": "col2",
"tags": [
{"value": 't11', "id": 15},
{"value": "t21", "id": 21}
]
},
]
现在我想查询tags字段下的value为t1,并且id为15的数据。可以很快的想到使用嵌套查询。
res = collection.find({'tags.value': 't1', 'tags.id':15})
理论上应该查询不到任何数据,因为我们期待value和id存在于同一个字典中,或者说,我们期待查询的元素是数组中的一个元素。
但事实上,value和id并不是保存在同一个地址下的。所以它会匹配全集合中满足其中一个条件的数据并返回。可以说,对于数组的查询,只需满足其中一个元素便可以看作查询匹配。
我们可以使用elemMatch限制。elemMatch会匹配包含一个数组字段的文档,该数组字段中至少要有一个元素与查询条件匹配,所以理论上如果你的查询条件只有一个,那就不需要使用elemMatch
res = collection.find(
{'tags.value': 't1', 'tags.id': 15},
{'tags': {
{'$elemMatch': {'value': 't1', 'id': 15}}
}}
)
当查询不到任何与之匹配的tags时,以上的查询语句只会返回_id
修改
修改单条
res = collection.update_one(options, data)
res.modified_count
修改多条
res = collection.update_many(options, data)
res.modified_count
查询并修改
res = collection.find_one_and_update(options, data, return_document=pymongo.ReturnDocument.BEFORE)
# res表示修改前的数据
查找并替换
res = collection.find_one_and_replace(options, data, return_docuemnt=pymongo.ReturnDocument.AFTER)
与设置某一字段不同,替换的data将会把旧数据完全覆盖。
删除字段
res = collection.update_many(options, {'$unset': {field: None}})
嵌套文档的修改
假设一个拥有嵌套文档结构的集合
[
{
"name": 'col',
'data': [
{'value': 't1', 'pos': 12},
{'value': 't2', 'pos': 13},
{'value': 't21', 'pos': 13},
]
}
]
我们想修改pos为13的value。可以这样写
res = collection.update_many(
{'data.pos': 13},
{'$set': {
'data.$.value': 'modt121'
}}
)
高级查询
pipeline
pipeline是mongoDB实现聚合操作的一种方式,pipeline的类型为数组,数组的元素就是当前管道对集合的一次操作,每一次操作都会以上一阶段的输出作为输入。
match
类似于SQL中的where。作为筛选条件,位置随意。如果出现在group之后,则类似于having
collection.aggregate(
[
{'$match': {'age': {'$gte': 30}}}
]
)
group
类似于SQL中的group by,将文档以某些字段进行分组。
假设一个集合
[
{
'company': 'cc1',
'publish': 234,
'records': [
{'content': 'c'},
{'content': 't4'}
]
},
{
'company': 'kk',
'publish': 19,
'records': [
{'content': 'cs'},
]
},
{
'company': 'cc1',
'publish': 12,
'records': [
{'content': 'cbb'},
]
},
]
我希望能以company作为分组依据,得到所有的publish字段的总和、records字段的总和以及当前组的条数。
collection.aggregate(
[
{'addFields': {
'records_sum': {'$size': '$records'}
}},
{'$group': {
'_id': field,
'publish_sum': {'$sum': '$publish'},
'company_sum': {'$sum': 1},
'records_sum': {'$sum': '$records_sum'}
}}
]
)
首先对于publish来说,直接使用$sum就可以得到总和。$sum操作的字段如果无法转换成数字,那么将被忽略;$sum后直接跟1,表示统计当前分组的条数。
records_sum类似于publish_sum,但如果直接使用{'$sum': '$records'}不会得到任何结果,因为$sum后跟的字段无法被转换成数字。所以在group之前,可以先使用addFields添加一个records_sum。
project
类似于SQL中的select,可以修改现有字段,新增输出字段等。
collection.aggregate(
[
{'$project': {'_id':0, 'name':1}}
]
)
lookup
将两个集合根据某个字段进行关联查询。最终输出一个集合。
假设有两个集合
# col1
[
{
'_id': ObjectId('hj321k4h1g41jkh31d2'),
'name': 'col'
}
]
# col2
[
{
'_id': ObjectId('dsfjsk1k3i0t6dsfdsh'),
'pro_id': ObjectId('hj321k4h1g41jkh31d2'),
'title': 'before'
}
]
对其进行关联查询
col1.aggregate(
[
{'$match': {'_id': ObjectId('hj321k4h1g41jkh31d2')}},
{'$lookup': {
'from': 'col2',
'localField': '_id',
'foreignField': 'pro_id',
'as': 'col2'
}}
]
)
这样,col2将作为col1的一个字段输出。
[
{
'_id': ObjectId('hj321k4h1g41jkh31d2'),
'name': 'col',
'col2': [
{
'_id': ObjectId('dsfjsk1k3i0t6dsfdsh'),
'pro_id': ObjectId('hj321k4h1g41jkh31d2'),
'title': 'before'
}
]
}
]
addFields
新增字段。可以新增一级字段,嵌套文档的字段,覆盖原有字段,数组新增元素等。
假设这样一个集合
[
{
'id':1,
'title': 'nature',
'tags': ['golang', 'cpp'],
'dict': {'name': 'n1', 'gender': 'male'},
}
]
collection.aggregate(
[
{'$addFields': {
'newsum': {'$size': '$tags'}, # 新增字段
'dict.age': 13, # 嵌套文档字段
'title': 'PLANET', # 覆盖原有字段
'tags': {
'$concatArrays': ['$tags', ['scala']]
} # 为数组添加元素
}}
]
)
假设一个通过lookup关联得到的嵌套文档。
[
{
'_id': ObjectId('hj321k4h1g41jkh31d2'),
'name': 'col',
'col2': [
{
'_id': ObjectId('dsfjsk1k3i0t6dsfdsh'),
'pro_id': ObjectId('hj321k4h1g41jkh31d2'),
'title': 'before'
}
]
}
]
由于你的业务需求,需要将col2._id转换为字符串。可以很容易想到使用$toString。但是由于col2是一个数组嵌套字典的结构,所以需要使用$map将操作作用于每一个元素。在$map输出时,将_id转换为字符串并通过$mergeObjects合并原有的文档并覆盖_id字段。
collection.aggregate(
[
{'$addFields': {
'col2': {
'$map': {
'input': '$col2',
'as': 'c',
'in': {
'$mergeObjects': [
'$$c',
{'_id': {'$toString': '$$c._id'}}
]
}
}
}
}}
]
)
set
$set实际上是$addFields的别名。两者功能相同。
我们可以在进行聚合查询时修改某些字段。例如,当需要进行$lookup关联操作时,如果关联的双方的字段类型并不统一,那么可以在$lookup之前将字段类型修改。
collection.aggregate(
[
{'$match': {'_id': ObjectId('hj321k4h1g41jkh31d2')}},
{'$set': {'_id': {'$toString': '_id'}}},
{'$lookup': {...}}
]
)
unwind
$unwind可以将字段解析为数组,并为数组的每一个元素返回一个文档。如果字段无法被解析为数组,那么将被视为一个单元素数组。
假设一个集合
[
{
'name': 'col',
'used': null,
'tags': ['a', 'b', 'c']
},
{
'name': 'col2',
'used': 1,
'tags': ['ds']
}
]
对tags字段进行拆分
collection.aggregate(
[
{'$unwind': 'tags'}
]
)
# return
[
{'name':'col', 'used': null, 'tags': 'a'},
{'name':'col', 'used': null, 'tags': 'b'},
{'name':'col', 'used': null, 'tags': 'c'},
{'name':'col2', 'used': 1, 'tags': 'ds'},
]
如果对used字段进行拆分,由于used不是数组,所以会看作是单元素元组。
collection.aggregate(
[
{'$unwind': 'used'}
]
)
# return
[
{'name': 'col2', 'used': 1, 'tags': ['ds']}
]
如果当前字段的值为空,缺失,为null的情况下,将不会输出该文档,所以查询时如果需要指定某一字段必须存在并且值不为空,可以使用unwind。
可以通过指定{preserveNullAndEmptyArrays: True}保持原文档输出。
collection.aggregate(
[
{'$unwind': {'path': 'used', 'preserveNullAndEmptyArrays': True}}
]
)
# return
[
{'name': 'col', 'used': null, 'tags': ['a', 'b', 'c']},
{'name': 'col2', 'used': 1, 'tags': ['ds']},
]
常用的管道运算
上面提到的$size, $sum, $concatArrays, $mergeObjects等都属于管道运算符(pipeline operator)。管道运算符可以搭配不同的管道进行计算,但也有用于专属管道的管道运算符,比如$push只能用于$group管道中。
更多运算符参考 Pipeline Operators
add
$add将两个值相加,值可以是字段值,数字,或者日期。
collection.aggregate(
[
{'$project': {'item': {'$add': [field, 12]}}}
]
)
push
只能在group中使用。往数组中添加字典对象。
def ifnull():
...
return False
collection.aggregate(
[
{'$group': {
'pushvalue': {
'p1': field,
'p2': 1 if ifnull() else 0
}
}}
]
)
addToSet
只能在group中使用,往数组中添加数据,不能有重复值
collection.aggregate(
[
{'$group': {
'item': {'$addToSet': field}
}}
]
)
ceil
返回大于等于指定数字的最小整数。如果指定数字为null或者nan,按原样返回。
collection.aggregate(
[
{'$project': {'ceiling': {'$ceil': field}}}
]
)
concat
将指定的字符串连接并返回
collection.aggregate(
[
{'$project': {'concatvalue': {'$concat': [field, '-', 'intro']}}}
]
)
cond
构建一个if-then-else表达式。
collection.aggregate(
[
{'$project': {
'condvalue': {
'cond': {
'if': {'$gte': [field, 12]},
'then': 30,
'else': 10
}
}
}}
]
)
filter
构建一个筛选表达式,返回满足的数据
collection.aggregate(
[
{'$project': {
'item': {
'$filter': {
'input': field,
'as': 'f',
'cond': {'$gte': [field, 10]}
}
}
}}
]
)
map
将表达式作用于数组的每一个元素
collection.aggregate(
[
{'$project': {
'item': {
'$map': {
'input': arrayfield,
'as': 'arr',
'in': {'$concat': ['$$arr', 'ing']}
}
}
}}
]
)