最近有一门课需要使用到python的flask框架,并对mysql关系型数据库、neo4j图数据库和hadoop分布式数据库进行查询,在这里作一些记录和总结。

mysql

  1. 安装mysql

    自从大二用了docker在服务器上安装软件后,变得一发不可收拾(docker实在是太舒服了)。我们的mysql部署也采用docker,mysql的镜像十分之多,找个star量高的pull一下,然后安装到服务器上即可,十分简单。

  2. 导入数据

    比较麻烦的是将数据导入到mysql,我们使用JetBrain的datagrip进行远程连接,然后导入数据。在上一篇blog描述了爬虫+数据处理的过程,最后得到的是csv文件格式的数据。因此这里也是将csv导入到mysql之中,具体的操作可以参考我项目的组长Baokker,导入成功之后,就可以直接导出.sql脚本进行数据迁移啦。

  3. mysql安全

    我们显然不能把数据都放在root账号,mysql支持账号和ip绑定,只允许该账号在指定的某个ip登录。虽然这样很安全,但是这样并不方便我们开发的时候进行debug。由于我们开发初期设置了123456这样的简单密码,导致被恶意入侵删光了所有数据(好在备份很足够),设置一个复杂的密码来保证安全是有必要的。

neo4j

  1. 安装neo4j

    为了简便起见,直接使用docker安装neo4j数据库,即

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    $ docker search neo4j
    NAME DESCRIPTION STARS OFFICIAL AUTOMATED
    neo4j Neo4j is a highly scalable, robust native gr… 1097 [OK]
    bitnami/neo4j Bitnami Docker Image for Neo4j 11 [OK]
    # docker pull neo4j
    $ docker run -d --name neo4j_a \ // -d表示容器后台运行 --name指定容器名字
    -p 7474:7474 -p 7687:7687 \ // 映射容器的端口号到宿主机的端口号
    -v /home/neo4j_a/data:/data \ // 把容器内的数据目录挂载到宿主机的对应目录下
    -v /home/neo4j_a/logs:/logs \ // 挂载日志目录
    -v /home/neo4j_a/conf:/var/lib/neo4j/conf \ // 挂载配置目录
    -v /home/neo4j_a/import:/var/lib/neo4j/import \ // 挂载数据导入目录
    --env NEO4J_AUTH=neo4j/123456 \ // 设定数据库的名字的访问密码
    neo4j // 指定使用的镜像
  2. 如若需要导入csv文件,直接在宿主机的/home/neo4j_a/import中放入文件,并重启容器即可

    1
    $ docker restart your-container-id
  3. 在项目进行期间,多次遇到想整个清空图数据库的操作,若使用cypher语句,当结点过多时,内存会难以承受,因此可以直接重开一个容器,并且旧的容器还可以作为版本回滚

    1
    2
    $ docker stop your-container-id
    $ docker run -d --name neo4j_a -p 7474:7474 -p 7687:7687 -v /home/neo4j_b/data:/data -v /home/neo4j_b/logs:/logs -v /home/neo4j_b/conf:/var/lib/neo4j/conf -v /home/neo4j_b/import:/var/lib/neo4j/import --env NEO4J_AUTH=neo4j/123456 neo4j
  4. 部署完成后,前往控制台开启端口7474,便可以在服务器7474端口访问neo4j的网页端控制台,在其中提供了许多示例与教程,可以进行测试与学习

    image-20221220135104948

spark

仍然参考本项目组长,由于这套hadoop+hive+spark实在很庞大,原本打算只部署在一台服务器上,但是后来宕机了好几次之后还是决定分开部署,所以就有了mysq+neo4j一起、spark单独一台的操作。

后端

后端使用的是python的flask框架,比起.netspringboot起来轻量级了不少。这里是后端项目地址

配置环境

主要是安装几个包,核心是flask这个包

1
2
3
4
$ pip install flask
$ pip install flask_sqlalchemy // 用于连接mysql数据库
$ pip install py2neo // 用于连接neo4j数据库
$ pip install pyspark // 用于spark

解决跨域

有了flask包,我们就可以简单搭个后端demo:这里使用request来接受传入的参数,并且使用jsonify将数据转换为json格式并返回。这地方返回的结果建议使用flask中的jsonify函数,因为Python内置包的json函数只是将数据转换为字符串,但jsonify更能适配web环境,具体原理可以参考这篇文章

关于跨域问题,只要使用flask_cors包中的CORS函数,设置app的resources即可,十分简单,实际上不开启跨域也可以被apifox等测试工具访问,但是对于前端的axios等组件就会报错,这是我们项目的前端仓库,可以参考。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# main.py
from flask import Flask, request, jsonify
from flask_cors import CORS

app = Flask(__name__)
CORS(app, resources=r'/*') # 注册CORS, "/*" 允许访问所有api

@app.route('/test/post', methods=['POST'])
def testpost():
data = request.get_json()
print(data)
return jsonify(data)

@app.route('/test/get', methods=['GET'])
def testget():
data = request.args
print(data)
return jsonify(data)

if __name__ == '__main__':
with app.app_context():
app.run(host='0.0.0.0', debug=True)

进入文件目录,输入python main.py,即可在默认端口5000上运行后端,记得打开服务器的5000端口。如果想要在后台运行,可以使用下述命令:

1
2
$ nohup python main.py & // 后台运行,并将日志放在nohup.out文件中
$ ps -aux|grep -v grep |grep main.py |awk '{print $2}'| xargs kill -9 // 停止运行

分文件

对于mysql、neo4j和spark,我们拆分成三个目录进行api的编写,将main.py作为运行入口,因此需要在main.py中进行注册。这里我们在每个目录后都加了个_,因为neo4j会和标准库冲突,因此改为neo4j_。

1
2
3
4
5
6
7
# mysql_/api/comprehensive.py
from flask import Blueprint
# 注册一个蓝图
comprehensive = Blueprint("mysql_comprehensive", __name__)
@comprehensive.route('/movie', methods=['POST'])
def comprehensiveMovieQuery():
pass
1
2
3
4
5
6
7
8
9
10
# main.py
# 导入蓝图
from mysql_.api.comprehensive import comprehensive as mysql_comprehensive
# 使用蓝图注册api,并添加路由前缀
app = Flask(__name__)
app.register_blueprint(mysql_comprehensive, url_prefix='/mysql/comprehensive')

if __name__ == '__main__':
with app.app_context():
app.run(host='0.0.0.0', debug=True)

连接mysql

首先需要将mysql表格进行orm映射,这里使用flask-sqlacodegen命令:

1
$ flask-sqlacodegen --flask 'mysql+pymysql://your_mysql_username:your_mysql_password@your_ip_address/your_database_name' --outfile "model.py"

执行完毕后,就会看到生成的文件中每个表都被映射为一个class,视图被映射成了SQLAlchemy.Table类型,但是这并不影响我们进行查询,以下是生成的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 以下是生成的model.py代码
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

class Act(db.Model):
__tablename__ = 'Act'

movie_id = db.Column(db.ForeignKey('Movie.movie_id'), primary_key=True, nullable=False)
actor_id = db.Column(db.ForeignKey('Actor.actor_id'), primary_key=True, nullable=False, index=True)
movie_title = db.Column(db.String(256), nullable=False, index=True)

actor = db.relationship('Actor', primaryjoin='Act.actor_id == Actor.actor_id', backref='acts')
movie = db.relationship('Movie', primaryjoin='Act.movie_id == Movie.movie_id', backref='acts')

t_Cooperation = db.Table(
'Cooperation',
db.Column('left_person_id', db.Integer, nullable=False, index=True),
db.Column('right_person_id', db.Integer, nullable=False, index=True),
db.Column('movie_id', db.Integer, nullable=False),
db.Column('type', db.Integer)
)

而后在api文件中将其导入,即可进行查询。


关于mysql的查询方法,主要使用model.py中的db,如:

1
2
3
4
5
6
7
# 获取查询对象,此时还没真正去查询
query_object = db.session \
.query(Table1.Column1, Table1.Column2, Table2.Column3) \
.fileter(Table1.Column1 == Table2.Column2)
print(query_object) # 打印查询对象,得到对应的sql语句
# 真正去查询数据库
result = query_object.all() # 类似.all()|.count()等函数是真正去查的

还可以使用.group_by().having()函数,也比较直白和简单,这里就不作举例。

我认为比较有用的一些技巧,列举在下面:

  1. 定义子查询subquery()和别名label("name"),相当于是自己临时定义一张表,由于在最后一步才进行真正查询,之前都只是生成sql语句,而在最后一步查询时mysql会自动帮我们优化sql语句,如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    director_cooperate_actor = db.session.query(
    Director.name.label("director_name"),
    t_Cooperation.c.left_person_id.label("director_id"),
    # Director.director_id.label("director_id"),
    Actor.name.label("actor_name"),
    t_Cooperation.c.right_person_id.label("actor_id"),
    # Actor.actor_id.label("actor_id"),
    t_Cooperation.c.movie_id.label("movie_id")
    ) \
    .filter(
    t_Cooperation.c.type == 1,
    Director.director_id == t_Cooperation.c.left_person_id,
    Actor.actor_id == t_Cooperation.c.right_person_id
    ) \
    .subquery()
    # 定义子查询后,可以在另一个查询中使用该子查询,即director_cooperate_actor,注意这里的.c指column,即自定义的label
    actors = db.session.query(director_cooperate_actor.c.actor_name) \
    .filter(director_cooperate_actor.c.director_name.like("%{}%".format(director))) \
    .group_by(
    director_cooperate_actor.c.director_id,
    director_cooperate_actor.c.actor_id
    ) \
    .having(func.count(director_cooperate_actor.c.movie_id) >= time) \
    .all()
  2. 使用sqlalchemy.func.group_concat(Table.Column)函数,表示在group_by()聚合的时候,将某个表(当然也可以是子表、视图等等)的字段进行字符串拼接。

    比如查询一个和某个导演合作的所有演员,并将他们合作的电影名找出来,此时就需要根据导演id和演员id进行group_by()聚合,但是他们合作的电影会有很多,正好使用group_concat进行电影名的拼接。

    注意这里的字符串拼接默认使用逗号,,按理说是可以自由指定的,但是我试了很久没有成功,可能是一个bug,正好电影名中也不包含逗号,所以后续直接.split(',')即可,下面是上述例子的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    directors = db.session.query(
    director_cooperate_actor.c.director_name,
    sqlalchemy.func.group_concat(director_cooperate_actor.c.title),
    # sqlalchemy.func.group_concat(sqlalchemy.func.concat_ws('-', director_cooperate_actor.c.title), SEPARATOR="-"),
    sqlalchemy.func.count(director_cooperate_actor.c.movie_id),
    ) \
    .filter(director_cooperate_actor.c.actor_name.like("%{}%".format(actor))) \
    .group_by(
    director_cooperate_actor.c.director_id,
    director_cooperate_actor.c.actor_id,
    # director_cooperate_actor.c.movie_id
    ) \
    .having(sqlalchemy.func.count(director_cooperate_actor.c.movie_id) >= times)
    .all()

    return list(map(lambda x: {"name": x[0], "title": x[1].split(','), "times": x[2]}, directors))
  3. 分页paginate

    分页只需要将.all()修改为.paginate(page=1, per_page=10).items即可,注意这里的itemsPaginate对象的成员变量,而非成员函数,下面的示例顺带加上了时间统计。

    1
    2
    3
    start_time = time.time()
    directors = directors_query.paginate(page=page, per_page=per_page).items
    consuming_time = time.time() - start_time

连接neo4j

neo4j的操作基本基于Graph这个类的对象,因此只需要在生成这个Graph时输入用户名和密码即可。

1
2
from py2neo import *
graph = Graph('http://81.68.102.171:7474', name='username', password='password')

关于neo4j的查询操作,可以直接查看本项目后端相应文件,在此不作描述。

连接spark

spark的连接需要定义session,可以参考本项目的这个文件,具体的操作由于这块儿不是我编写,就不叙说了。