# pymongo 连接阿里云 MongoDB 服务器
这里我用了 pymongo
3.6
版本,不知道为什么,pymongo 4.0 不能连上阿里云的 MongoDB 服务器,怎么改也没有。。。😅
def get_db(): | |
username = urllib.parse.quote_plus('数据库name') | |
password = urllib.parse.quote_plus('数据库password') | |
tunnel = SSHTunnelForwarder( | |
("跳板机ip",22), | |
ssh_username="跳板机名称", | |
ssh_password="跳板机密码", | |
remote_bind_address=(r"阿里云MongoDB服务器地址", 3717)) | |
tunnel.start() | |
print(tunnel.local_bind_port) | |
client = MongoClient('mongodb://%s:%s@127.0.0.1' % (username, password), tunnel.local_bind_port) | |
db = client['admin'] | |
npm_records = db['npm_records'] | |
npm_metas = db['npm_metas'] | |
return npm_records, npm_metas |
# MTTU 查询
# 查询依赖 metas 数据
dms
终端下:db.npm_metas.find({"name": "joi"})
pymongo
下npm_metas.find({"name":"joi})
# 查询组件创建时间
def get_com_create_time(self, com_name): | |
com_create_time = self.metas.find({"name":com_name},{"_id":0,"created":1}) | |
return list(com_create_time)[0]['created'] |
返回依赖的创建时间
# 查询组件是否存在
def is_com_exist(self, com_name): | |
tmp = self.metas.find({"name":com_name},{"_id":0,"name":1}) | |
tmp = list(tmp) | |
if len(tmp) == 0: | |
raise RuntimeError("No such component") | |
self.com_create_time = self.get_com_create_time(self.com_name) | |
if self.com_create_time > self.end: | |
raise RuntimeError("No such component") |
# 组件时间范围版本列表
def get_ver_list(self, name, end, start=None): | |
if start is None: | |
start = datetime.datetime(1999, 1, 1) | |
verlist = self.metas.aggregate( | |
[ | |
{ | |
"$match":{"name": name} | |
}, | |
{ | |
"$unwind": "$releases" | |
}, | |
{ | |
"$project": | |
{ | |
"_id":0, | |
"version":"$releases.version", | |
"time": "$releases.time" | |
} | |
}, | |
{ | |
"$match": | |
{ | |
"time": { | |
"$lte": end, | |
"$gte": start | |
} | |
} | |
} | |
] | |
) | |
verlist = list(verlist) | |
return verlist |
这里有个 MongoDB 的管道的复杂查询:
aggregate
: MongoDB 强大的聚合函数$match
:match 用于过滤文档$unwind
:将 release 列表拆分为多个文档$project
:过滤函数,可以指定新的字段名,"_id" 是显示地为 1lte、gte
:分别是代码比较函数
之后的代码都是怎么运用这些基础查询,这里略过。
# 代码实现思考
这个代码写的我很混乱,我现在都不知道我是咋实现的。。。