数据仓库数据库部署
最近有一门课需要使用到python的flask框架,并对mysql关系型数据库、neo4j图数据库和hadoop分布式数据库进行查询,在这里作一些记录和总结。
mysql
安装mysql
自从大二用了docker在服务器上安装软件后,变得一发不可收拾
(docker实在是太舒服了)。我们的mysql部署也采用docker,mysql的镜像十分之多,找个star量高的pull一下,然后安装到服务器上即可,十分简单。导入数据
比较麻烦的是将数据导入到mysql,我们使用JetBrain的datagrip进行远程连接,然后导入数据。在上一篇blog描述了爬虫+数据处理的过程,最后得到的是csv文件格式的数据。因此这里也是将csv导入到mysql之中,具体的操作可以参考我项目的组长Baokker,导入成功之后,就可以直接导出
.sql
脚本进行数据迁移啦。mysql安全
我们显然不能把数据都放在root账号,mysql支持账号和ip绑定,只允许该账号在指定的某个ip登录。虽然这样很安全,但是这样并不方便我们开发的时候进行debug。由于我们开发初期设置了123456这样的简单密码,导致被恶意入侵删光了所有数据
(好在备份很足够),设置一个复杂的密码来保证安全是有必要的。
neo4j
安装neo4j
为了简便起见,直接使用docker安装neo4j数据库,即
1
2
3
4
5
6
7
8
9
10
11
12
13$ docker search neo4j
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
neo4j Neo4j is a highly scalable, robust native gr… 1097 [OK]
bitnami/neo4j Bitnami Docker Image for Neo4j 11 [OK]
# docker pull neo4j
$ docker run -d --name neo4j_a \ // -d表示容器后台运行 --name指定容器名字
-p 7474:7474 -p 7687:7687 \ // 映射容器的端口号到宿主机的端口号
-v /home/neo4j_a/data:/data \ // 把容器内的数据目录挂载到宿主机的对应目录下
-v /home/neo4j_a/logs:/logs \ // 挂载日志目录
-v /home/neo4j_a/conf:/var/lib/neo4j/conf \ // 挂载配置目录
-v /home/neo4j_a/import:/var/lib/neo4j/import \ // 挂载数据导入目录
--env NEO4J_AUTH=neo4j/123456 \ // 设定数据库的名字的访问密码
neo4j // 指定使用的镜像如若需要导入csv文件,直接在宿主机的/home/neo4j_a/import中放入文件,并重启容器即可
1
$ docker restart your-container-id
在项目进行期间,多次遇到想整个清空图数据库的操作,若使用cypher语句,当结点过多时,内存会难以承受,因此可以直接重开一个容器,并且旧的容器还可以作为版本回滚
1
2$ docker stop your-container-id
$ docker run -d --name neo4j_a -p 7474:7474 -p 7687:7687 -v /home/neo4j_b/data:/data -v /home/neo4j_b/logs:/logs -v /home/neo4j_b/conf:/var/lib/neo4j/conf -v /home/neo4j_b/import:/var/lib/neo4j/import --env NEO4J_AUTH=neo4j/123456 neo4j部署完成后,前往控制台开启端口7474,便可以在服务器7474端口访问neo4j的网页端控制台,在其中提供了许多示例与教程,可以进行测试与学习
spark
仍然参考本项目组长,由于这套hadoop+hive+spark实在很庞大,原本打算只部署在一台服务器上,但是后来宕机了好几次之后还是决定分开部署,所以就有了mysq+neo4j一起、spark单独一台的操作。
后端
后端使用的是python的flask框架,比起.net
或springboot
起来轻量级了不少。这里是后端项目地址。
配置环境
主要是安装几个包,核心是flask这个包
1 | $ pip install flask |
解决跨域
有了flask包,我们就可以简单搭个后端demo:这里使用request
来接受传入的参数,并且使用jsonify
将数据转换为json格式并返回。这地方返回的结果建议使用flask中的jsonify函数,因为Python内置包的json函数只是将数据转换为字符串,但jsonify更能适配web环境,具体原理可以参考这篇文章。
关于跨域问题,只要使用flask_cors
包中的CORS函数,设置app的resources即可,十分简单,实际上不开启跨域也可以被apifox等测试工具访问,但是对于前端的axios等组件就会报错,这是我们项目的前端仓库,可以参考。
1 | # main.py |
进入文件目录,输入python main.py
,即可在默认端口5000上运行后端,记得打开服务器的5000端口。如果想要在后台运行,可以使用下述命令:
1 | $ nohup python main.py & // 后台运行,并将日志放在nohup.out文件中 |
分文件
对于mysql、neo4j和spark,我们拆分成三个目录进行api的编写,将main.py
作为运行入口,因此需要在main.py
中进行注册。这里我们在每个目录后都加了个_,因为neo4j会和标准库冲突,因此改为neo4j_。
1 | # mysql_/api/comprehensive.py |
1 | # main.py |
连接mysql
首先需要将mysql表格进行orm映射,这里使用flask-sqlacodegen
命令:
1 | $ flask-sqlacodegen --flask 'mysql+pymysql://your_mysql_username:your_mysql_password@your_ip_address/your_database_name' --outfile "model.py" |
执行完毕后,就会看到生成的文件中每个表都被映射为一个class
,视图被映射成了SQLAlchemy.Table
类型,但是这并不影响我们进行查询,以下是生成的示例:
1 | # 以下是生成的model.py代码 |
而后在api文件中将其导入,即可进行查询。
关于mysql的查询方法,主要使用model.py
中的db
,如:
1 | # 获取查询对象,此时还没真正去查询 |
还可以使用.group_by()
和.having()
函数,也比较直白和简单,这里就不作举例。
我认为比较有用的一些技巧,列举在下面:
定义子查询
subquery()
和别名label("name")
,相当于是自己临时定义一张表,由于在最后一步才进行真正查询,之前都只是生成sql语句,而在最后一步查询时mysql会自动帮我们优化sql语句,如:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24director_cooperate_actor = db.session.query(
Director.name.label("director_name"),
t_Cooperation.c.left_person_id.label("director_id"),
# Director.director_id.label("director_id"),
Actor.name.label("actor_name"),
t_Cooperation.c.right_person_id.label("actor_id"),
# Actor.actor_id.label("actor_id"),
t_Cooperation.c.movie_id.label("movie_id")
) \
.filter(
t_Cooperation.c.type == 1,
Director.director_id == t_Cooperation.c.left_person_id,
Actor.actor_id == t_Cooperation.c.right_person_id
) \
.subquery()
# 定义子查询后,可以在另一个查询中使用该子查询,即director_cooperate_actor,注意这里的.c指column,即自定义的label
actors = db.session.query(director_cooperate_actor.c.actor_name) \
.filter(director_cooperate_actor.c.director_name.like("%{}%".format(director))) \
.group_by(
director_cooperate_actor.c.director_id,
director_cooperate_actor.c.actor_id
) \
.having(func.count(director_cooperate_actor.c.movie_id) >= time) \
.all()使用
sqlalchemy.func.group_concat(Table.Column)
函数,表示在group_by()
聚合的时候,将某个表(当然也可以是子表、视图等等)的字段进行字符串拼接。比如查询一个和某个导演合作的所有演员,并将他们合作的电影名找出来,此时就需要根据导演id和演员id进行
group_by()
聚合,但是他们合作的电影会有很多,正好使用group_concat
进行电影名的拼接。注意这里的字符串拼接默认使用逗号
,
,按理说是可以自由指定的,但是我试了很久没有成功,可能是一个bug,正好电影名中也不包含逗号,所以后续直接.split(',')
即可,下面是上述例子的代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16directors = db.session.query(
director_cooperate_actor.c.director_name,
sqlalchemy.func.group_concat(director_cooperate_actor.c.title),
# sqlalchemy.func.group_concat(sqlalchemy.func.concat_ws('-', director_cooperate_actor.c.title), SEPARATOR="-"),
sqlalchemy.func.count(director_cooperate_actor.c.movie_id),
) \
.filter(director_cooperate_actor.c.actor_name.like("%{}%".format(actor))) \
.group_by(
director_cooperate_actor.c.director_id,
director_cooperate_actor.c.actor_id,
# director_cooperate_actor.c.movie_id
) \
.having(sqlalchemy.func.count(director_cooperate_actor.c.movie_id) >= times)
.all()
return list(map(lambda x: {"name": x[0], "title": x[1].split(','), "times": x[2]}, directors))分页paginate
分页只需要将
.all()
修改为.paginate(page=1, per_page=10).items
即可,注意这里的items
是Paginate
对象的成员变量,而非成员函数,下面的示例顺带加上了时间统计。1
2
3start_time = time.time()
directors = directors_query.paginate(page=page, per_page=per_page).items
consuming_time = time.time() - start_time
连接neo4j
neo4j的操作基本基于Graph这个类的对象,因此只需要在生成这个Graph时输入用户名和密码即可。
1 | from py2neo import * |
关于neo4j的查询操作,可以直接查看本项目后端相应文件,在此不作描述。
连接spark
spark的连接需要定义session,可以参考本项目的这个文件,具体的操作由于这块儿不是我编写,就不叙说了。