一步步玩转测试平台开发(三):如何同步YApi数据

在前文《如何制定合适的平台方案》中,提到了由于Swagger过于重,不适合快速迭代开发,因此我们就把接口信息维护的工作转移到了YApi系统上。

肯定有同学要问YApi是什么?YApi自己的宣传语: YApi 是高效、易用、功能强大的 api 管理平台,旨在为开发、产品、测试人员提供更优雅的接口管理服务。可以帮助开发者轻松创建、发布、维护 API,YApi 还为用户提供了优秀的交互体验,开发人员只需利用平台提供的接口数据写入工具以及简单的点击操作就可以实现接口的管理。

具体的你可以去https://gitee.com/mirrors/YApi一探究竟,这块不是本文重点讨论的问题

接下来就进入主题吧!

我们知道了所有的接口数据都在YApi上,而我们怎么能拿到这些数据给平台用呢?于是我进入YApi系统查看了一下它的帮助文档:

  • 如果通过菜单操作的话,方法如下(现在又多了一个支持swaggerjson导出格式)YApi数据导出官方帮助.jpg

  • YApi也提供了开放的API用来获取不同层级的接口数据YApi开放api帮助文档.jpg

为了能更好的展开说明后续内容,此时需要给大家简单的说明一下YApi管理接口的层级。其实很简单你要想找到一个接口通过如下层级就可以了: 项目 -> 功能(官方叫菜单,为了好理解我们习惯叫它功能,后文统一叫功能) -> 接口

拿我们实际项目举个例子,目前在YApi上我们总共维护了13个项目,如下图: YApi项目展示.jpg

进入某一个项目后,你就会看到界面左边的就是这个项目所包含的功能清单,点击其中一个功能后,右边就会展示接口清单,如下图: YApi项目接口展示.jpg

然后再点击某一个接口就进入接口详细信息展示页,如下图: YApi接口展示.jpg

说到这里,大家应该都能明白YApi是如何管理接口了吧。由于开发每天都在YApi上新建维护接口,所以我就要想办法每天能及时获取这些更新的接口数据。官方提供的通过按钮操作的方式导数据显然不适合,并且我们的项目很多,就直接pass了这个方法。那么就剩下最后通过开放api的方式获取数据,理论上这个方式可行。但一结合我们实际项目却发现这里面有一个很大的问题,我们公司的YApi是通过Keycloak来实现SSO,因而你要想访问开放的api就必须要用自己的公司账号,我们又没有公共的测试账号,因而出于安全和后期维护的考虑,这条路也不可行,那么怎么办呢?

YApi的后台数据是通过MangoDB来存储的,那么如果我可以直接访问这个数据库,问题不就迎刃可解了嘛?于是通过走流程我们可以访问数据库了。 虽然YApi是一个开源系统,但是它的MangoDB数据结构我却网上找了半天也没找到,后来只能硬着头皮花了两天看源码才最终搞明白了它的数据结构。

我主要是用了如下五个表:

  • group:用户群表 (用来记录用户群组信息)
  • user: 用户表
  • interface: 接口表
  • interface_cat: 功能表(维度表)
  • project: 项目表(维度表)

注:你在系统中能看到项目全部都是通过这个group来管理,你只能看到跟你同group下的用户所创建的项目。

YApi数据结构.jpg

你可以理解为MangoDB的数据全部都是以json数据格式进行保存和展示,因而你可以通过pymango来进行读取MangoDB的操作(说白了就是读取了一大堆的json数据),然后通过pymysql将mangodb的数据更新到你平台的mysql里(将json数据转换成list形式)。在这个操作过程中,你就可以在脚本中添加识别新增接口数据的判断(新获取到接口清单,与同步前的接口清单进行比较),然后对接企业微信或者钉钉群机器人推送脚本(后续文章会介绍这块)来第一时间通知我们测试人员有新接口了。同步的脚本实现后你可以放到平台上定时运行,或者也可以直接放到Jenkins定时运行。

下面是我写的操作MangoDB的通用方法,供参考:

from pymongo import MongoClient

class ReadFromMangoDB():
    """
    读取MangoDB数据库数据
    """

    def __init__(self, db_server, username, password, db_name,mechanism='SCRAM-SHA-1'):
        self.conn = None
        self.database = None
        self.db_server = db_server
        self.username = username
        self.password = password
        self.db_name =db_name
        self.mechanism = mechanism

    def connect_to_server(self):
        """
        连接数据库服务器
        :return:
        """
        try:
            self.conn = MongoClient('mongodb://%s/' % self.db_server)
            self.conn.yapi.authenticate(self.username,self.password,mechanism=self.mechanism)
            print("MangoDB数据库连接成功!")
        except:
            print("MangoDB数据库连接失败!")

    def connect_to_database(self):
        """
        登录要访问的库
        :return:
        """
        try:
            self.database = self.conn[self.db_name]
            print("数据库: %s 连接成功!" % self.db_name)
        except:
            print("数据库: %s 连接失败!" % self.db_name)

    def read_table(self,table_name,index_list):
        """
        读取目标表中数据
        :param table_name: 表名称
        :param index_list: 要读取字段清单
        :return:
        """
        print("""
==================================================
 * 开始读取目标表: %s 的操作""" % table_name)
        if isinstance(index_list,list):
            try:
                table = self.database[table_name]
                expot_table = []
                for item in table.find():
                    tmp_table = []
                    for index_item in index_list:
                        tmp_table.append(item[index_item])
                    expot_table.append(tmp_table)
                print("MangoDB表:%s读取完毕,共读取%s行数据!" %(table_name,len(expot_table)))
                return expot_table
            except:
                print("数据表:%s 查询失败或者数据库中无此表!" % table_name)
        else:
            print("入参index_list不是list")
            return []

    def connection_close(self):
        """
        断开与数据库服务器的连接
        :return:
        """
        print("""
---断开MangoDB数据库的连接---""")
        self.conn.close()

下面是我写的操作Mysql的通用方法,供参考:

import pymysql, time

class MysqlOperation():

    """
    连接MySQL数据库,并在目标表里写数据
    """

    CURRENT_TIME = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))

    def __init__(self, db_host, db_port, db_user, db_passwd, db_name, db_charset='utf8mb4'):
        self.db_host = db_host
        self.db_port = db_port
        self.db_user = db_user
        self.db_passwd = db_passwd
        self.db_name = db_name
        self.db_charset = db_charset
        self.db = None

    def mysql_link(self):
        """连接数据库"""
        try:
            self.db = pymysql.connect(host=self.db_host, user=self.db_user, password=self.db_passwd,
                             database=self.db_name, charset=self.db_charset)
            print("MySQL数据库已经连接成功!")
        except:
            print("could not connect to mysql server")

    def connection_close(self):
        """
        断开与数据库的连接
        :return:
        """
        print("""
---断开MySQL数据库的连接---""")

        self.db.close()

    def clear_data(self,table_name):
        """
        清除目标表的数据
        :param table_name:
        :return:
        """

        print("""
==================================================
 * 开始对目标表: %s 的数据进行清理操作""" % table_name)

        #计算一下目标表在清除前的数据量
        sql_count = "select count(1) from %s" % table_name
        cursor = self.db.cursor()
        cursor.execute(sql_count)
        data_count = cursor.fetchone()

        #进行目标表数据清除工作
        sql = "delete from %s" % table_name
        cursor = self.db.cursor()
        cursor.execute(sql)
        self.db.commit()

        print("数据表: %s 中的%s行数据已经被清除完毕!"%(table_name,data_count[0]))

    def insert_data(self,sql,table_name):
        """
        对目标表进行数据插入
        :param sql: 要插入数据的sql
        :param table_name: 目标表
        :return:
        """
        print("""
==================================================
 * 开始对目标表: %s 的数据进行刷新操作""" % table_name)
        cursor = self.db.cursor()
        cursor.execute(sql)
        self.db.commit()

        sql_count = "select count(1) from %s" % table_name
        cursor = self.db.cursor()
        cursor.execute(sql_count)
        data_count = cursor.fetchone()

        print("共%s行数据已经在目标表:%s中刷新完毕!" %(data_count[0],table_name))

    def sync_to_db(self,table_name,data_list, compare_dic):
        """将数据导入数据库"""
        cursor = self.db.cursor()  # 使用 cursor() 方法创建一个游标对象 cursor

        # 数据写入目标表
        print("""
==================================================
 * 开始对目标表: %s 的数据进行刷新操作""" % table_name)
        sql = ""

        for item in data_list:

            table_index = " ("
            data_index = "values ("
            len_dict = len(compare_dic)
            n = 1
            for key, value in compare_dic.items():
                table_index += key + ","
                data_index += '"%s", ' % item[value]
                if n == len_dict:
                    table_index += "op_time) "
                    data_index += '"%s")' % self.CURRENT_TIME
                n += 1
            try:
                sql = "insert into " + table_name + table_index + data_index
                cursor.execute(sql)
            except:
                print("""目标表: %s 的数据刷新异常""" % table_name)
                print(sql)
                break
        print("%s rows datas has been inserted into %s!" % (len(data_list),table_name))
        self.db.commit()
        cursor.close()