3 minute read

数据库连接

使用pymongo连接

import pymongo

mongourl = f'mongodb://username:password@localhost:27017'
myclient = pymongo.MongoClient(mongo_url)
db = myclient[database]   # 获取数据库
collection = db.get_collection(collection_name) # 获取集合

在shell中连接mongoDB

// 连接本地mongodb
> mongo

// 连接远程mongodb
> mongo 192.168.188.14:27018/admin -u root -p epochn

// 如果无法连接可以尝试加入参数
> mongo --authenticationDatabase 192.168.188.14:27018/admin -u root -p epochn

查看mongoDB状态

在shell中

> db.collection.stats()

在pymongo中

db.command('collstats', collection)   # 集合状态
db.command('dbstats')   # 数据库状态

常用方法

db.list_collection_names(session=None)
collection.list_indexes()       # 查看集合所有索引
collection.index_infomation()   # 查看索引信息

CRUD

查询

查询多条数据

res = collection.find(options, extra)

查询单条数据

res = collection.find_one(options, extra)

空值

collection.find({'field': None})
collection.find({'field': ""})
collection.find({'field': {'$exists': False}})

注意,三种查询空值的方式所得到的结果是不同的,所以要么统一空值的表示,要么使用高级查询的unwind

嵌套查询

假设一个拥有嵌套文档结构的集合:

[
  {
    "name": "col1",
    "data": [
      {"score": 12, "pos": 1},
      {"score": 23, "pos": 32}
    ]
  }
]

查询score大于15的数据

res = collection.find({"data.score": {"$gt": 15}})

数组过滤(elemMatch)

假设一个集合拥有字段

[
  {
    "name": "col",
    "tags": [
      {"value": 't1', "id": 13},
      {"value": "t2", "id": 11}
    ]
  },
  {
    "name": "col2",
    "tags": [
      {"value": 't11', "id": 15},
      {"value": "t21", "id": 21}
    ]
  },
]

现在我想查询tags字段下的valuet1,并且id15的数据。可以很快的想到使用嵌套查询。

res = collection.find({'tags.value': 't1', 'tags.id':15})

理论上应该查询不到任何数据,因为我们期待valueid存在于同一个字典中,或者说,我们期待查询的元素是数组中的一个元素。

但事实上,valueid并不是保存在同一个地址下的。所以它会匹配全集合中满足其中一个条件的数据并返回。可以说,对于数组的查询,只需满足其中一个元素便可以看作查询匹配。

我们可以使用elemMatch限制。elemMatch会匹配包含一个数组字段的文档,该数组字段中至少要有一个元素与查询条件匹配,所以理论上如果你的查询条件只有一个,那就不需要使用elemMatch

res = collection.find(
  {'tags.value': 't1', 'tags.id': 15},
  {'tags': {
    {'$elemMatch': {'value': 't1', 'id': 15}}
  }}
)

当查询不到任何与之匹配的tags时,以上的查询语句只会返回_id

修改

修改单条

res = collection.update_one(options, data)
res.modified_count

修改多条

res = collection.update_many(options, data)
res.modified_count

查询并修改

res = collection.find_one_and_update(options, data, return_document=pymongo.ReturnDocument.BEFORE)

# res表示修改前的数据

查找并替换

res = collection.find_one_and_replace(options, data, return_docuemnt=pymongo.ReturnDocument.AFTER)

与设置某一字段不同,替换的data将会把旧数据完全覆盖。

删除字段

res = collection.update_many(options, {'$unset': {field: None}})

嵌套文档的修改

假设一个拥有嵌套文档结构的集合

[
  {
    "name": 'col',
    'data': [
      {'value': 't1', 'pos': 12},
      {'value': 't2', 'pos': 13},
      {'value': 't21', 'pos': 13},
    ]
  }
]

我们想修改pos为13的value。可以这样写

res = collection.update_many(
  {'data.pos': 13},
  {'$set': {
    'data.$.value': 'modt121'
  }}
)

高级查询

pipeline

pipeline是mongoDB实现聚合操作的一种方式,pipeline的类型为数组,数组的元素就是当前管道对集合的一次操作,每一次操作都会以上一阶段的输出作为输入。

match

类似于SQL中的where。作为筛选条件,位置随意。如果出现在group之后,则类似于having

collection.aggregate(
  [
    {'$match': {'age': {'$gte': 30}}}
  ]
)

group

类似于SQL中的group by,将文档以某些字段进行分组。

假设一个集合

[
  {
    'company': 'cc1',
    'publish': 234,
    'records': [
      {'content': 'c'},
      {'content': 't4'}
    ]
  },
  {
    'company': 'kk',
    'publish': 19,
    'records': [
      {'content': 'cs'},
    ]
  },
  {
    'company': 'cc1',
    'publish': 12,
    'records': [
      {'content': 'cbb'},
    ]
  },
]

我希望能以company作为分组依据,得到所有的publish字段的总和、records字段的总和以及当前组的条数。

collection.aggregate(
  [
    {'addFields': {
      'records_sum': {'$size': '$records'}
    }},
    {'$group': {
      '_id': field,
      'publish_sum': {'$sum': '$publish'},
      'company_sum': {'$sum': 1},
      'records_sum': {'$sum': '$records_sum'}
    }}
  ]
)

首先对于publish来说,直接使用$sum就可以得到总和。$sum操作的字段如果无法转换成数字,那么将被忽略;$sum后直接跟1,表示统计当前分组的条数。

records_sum类似于publish_sum,但如果直接使用{'$sum': '$records'}不会得到任何结果,因为$sum后跟的字段无法被转换成数字。所以在group之前,可以先使用addFields添加一个records_sum

project

类似于SQL中的select,可以修改现有字段,新增输出字段等。

collection.aggregate(
  [
    {'$project': {'_id':0, 'name':1}}
  ]
)

lookup

将两个集合根据某个字段进行关联查询。最终输出一个集合。

假设有两个集合

# col1
[
  {
    '_id': ObjectId('hj321k4h1g41jkh31d2'),
    'name': 'col'
  }
]

# col2
[
  {
    '_id': ObjectId('dsfjsk1k3i0t6dsfdsh'),
    'pro_id': ObjectId('hj321k4h1g41jkh31d2'),
    'title': 'before'
  }
]

对其进行关联查询

col1.aggregate(
  [
    {'$match': {'_id': ObjectId('hj321k4h1g41jkh31d2')}},
    {'$lookup': {
      'from': 'col2',
      'localField': '_id',
      'foreignField': 'pro_id',
      'as': 'col2'
    }}
  ]
)

这样,col2将作为col1的一个字段输出。

[
  {
    '_id': ObjectId('hj321k4h1g41jkh31d2'),
    'name': 'col',
    'col2': [
      {
        '_id': ObjectId('dsfjsk1k3i0t6dsfdsh'),
        'pro_id': ObjectId('hj321k4h1g41jkh31d2'),
        'title': 'before'
      }
    ]
  }
]

addFields

新增字段。可以新增一级字段,嵌套文档的字段,覆盖原有字段,数组新增元素等。

假设这样一个集合

[
  {
    'id':1,
    'title': 'nature',
    'tags': ['golang', 'cpp'],
    'dict': {'name': 'n1', 'gender': 'male'},
  }
]
collection.aggregate(
  [
    {'$addFields': {
      'newsum': {'$size': '$tags'},   # 新增字段
      'dict.age': 13,                 # 嵌套文档字段
      'title': 'PLANET',              # 覆盖原有字段
      'tags': {
        '$concatArrays': ['$tags', ['scala']]
      }                               # 为数组添加元素
    }}
  ]
)

假设一个通过lookup关联得到的嵌套文档。

[
  {
    '_id': ObjectId('hj321k4h1g41jkh31d2'),
    'name': 'col',
    'col2': [
      {
        '_id': ObjectId('dsfjsk1k3i0t6dsfdsh'),
        'pro_id': ObjectId('hj321k4h1g41jkh31d2'),
        'title': 'before'
      }
    ]
  }
]

由于你的业务需求,需要将col2._id转换为字符串。可以很容易想到使用$toString。但是由于col2是一个数组嵌套字典的结构,所以需要使用$map将操作作用于每一个元素。在$map输出时,将_id转换为字符串并通过$mergeObjects合并原有的文档并覆盖_id字段。

collection.aggregate(
  [
    {'$addFields': {
      'col2': {
        '$map': {
          'input': '$col2',
          'as': 'c',
          'in': {
            '$mergeObjects': [
              '$$c',
              {'_id': {'$toString': '$$c._id'}}
            ]
          }
        }
      }
    }}
  ]
)

set

$set实际上是$addFields的别名。两者功能相同。

我们可以在进行聚合查询时修改某些字段。例如,当需要进行$lookup关联操作时,如果关联的双方的字段类型并不统一,那么可以在$lookup之前将字段类型修改。

collection.aggregate(
  [
    {'$match': {'_id': ObjectId('hj321k4h1g41jkh31d2')}},
    {'$set': {'_id': {'$toString': '_id'}}},
    {'$lookup': {...}}
  ]
)

unwind

$unwind可以将字段解析为数组,并为数组的每一个元素返回一个文档。如果字段无法被解析为数组,那么将被视为一个单元素数组。

假设一个集合

[
  {
    'name': 'col',
    'used': null,
    'tags': ['a', 'b', 'c']
  },
  {
    'name': 'col2',
    'used': 1,
    'tags': ['ds']
  }
]

tags字段进行拆分

collection.aggregate(
  [
    {'$unwind': 'tags'}
  ]
)

# return
[
  {'name':'col', 'used': null, 'tags': 'a'},
  {'name':'col', 'used': null, 'tags': 'b'},
  {'name':'col', 'used': null, 'tags': 'c'},
  {'name':'col2', 'used': 1, 'tags': 'ds'},
]

如果对used字段进行拆分,由于used不是数组,所以会看作是单元素元组。

collection.aggregate(
  [
    {'$unwind': 'used'}
  ]
)

# return
[
  {'name': 'col2', 'used': 1, 'tags': ['ds']}
]

如果当前字段的值为空,缺失,为null的情况下,将不会输出该文档,所以查询时如果需要指定某一字段必须存在并且值不为空,可以使用unwind

可以通过指定{preserveNullAndEmptyArrays: True}保持原文档输出。

collection.aggregate(
  [
    {'$unwind': {'path': 'used', 'preserveNullAndEmptyArrays': True}}
  ]
)

# return
[
  {'name': 'col', 'used': null, 'tags': ['a', 'b', 'c']},
  {'name': 'col2', 'used': 1, 'tags': ['ds']},
]

常用的管道运算

上面提到的$size, $sum, $concatArrays, $mergeObjects等都属于管道运算符(pipeline operator)。管道运算符可以搭配不同的管道进行计算,但也有用于专属管道的管道运算符,比如$push只能用于$group管道中。

更多运算符参考 Pipeline Operators

add

$add将两个值相加,值可以是字段值,数字,或者日期。

collection.aggregate(
  [
    {'$project': {'item': {'$add': [field, 12]}}}
  ]
)

push

只能在group中使用。往数组中添加字典对象。

def ifnull():
  ...
  return False

collection.aggregate(
  [
    {'$group': {
      'pushvalue': {
        'p1': field,
        'p2': 1 if ifnull() else 0
      }
    }}
  ]
)

addToSet

只能在group中使用,往数组中添加数据,不能有重复值

collection.aggregate(
  [
    {'$group': {
      'item': {'$addToSet': field}
    }}
  ]
)

ceil

返回大于等于指定数字的最小整数。如果指定数字为null或者nan,按原样返回。

collection.aggregate(
  [
    {'$project': {'ceiling': {'$ceil': field}}}
  ]
)

concat

将指定的字符串连接并返回

collection.aggregate(
  [
    {'$project': {'concatvalue': {'$concat': [field, '-', 'intro']}}}
  ]
)

cond

构建一个if-then-else表达式。

collection.aggregate(
  [
    {'$project': {
      'condvalue': {
        'cond': {
          'if': {'$gte': [field, 12]},
          'then': 30,
          'else': 10
        }
      }
    }}
  ]
)

filter

构建一个筛选表达式,返回满足的数据

collection.aggregate(
  [
    {'$project': {
      'item': {
        '$filter': {
          'input': field,
          'as': 'f',
          'cond': {'$gte': [field, 10]}
        }
      }
    }}
  ]
)

map

将表达式作用于数组的每一个元素

collection.aggregate(
  [
    {'$project': {
      'item': {
        '$map': {
          'input': arrayfield,
          'as': 'arr',
          'in': {'$concat': ['$$arr', 'ing']}
        }
      }
    }}
  ]
)