zoukankan      html  css  js  c++  java
  • python tornado 实现类禅道系统

       最近楼主加班 喽, 好久没有更新我的博客了,哎,一言难尽,废话我就不说了,来开始上精华。

    背景:目前市面上有很多bug管理工具,但是各有各的特点,最著名,最流行的就是禅道,一个偶然的机会接触到了python ,学到tornado后,就想着去怎么去用到实处,后来发现自己公司的除了禅道就记录bug没有什么可以用的工具了。

    语言:python3 第三库 :tornado,qiniu(用于云存储文件),数据库用sqlite

    why  use tornado?很多人其实会这么问我,我感觉tornado可以实现异步,虽然现在代码还没有用到异步,我感觉还是很不错的框架,值得学习,现在很多公司都在用,个人感觉这是一个不错的,值得我们大家去学习的框架。

    来看看我的需求文档

    大题是这么的轮廓,那么拿到这个的时候,我会进行需求分析,

    需要什么样的数据库, 数据模型之间的关系,虽然现在还是有很多地方是写死的还没有进行搜索功能的设置,但是我相信,有了现在这个demo

    那么我开始来设计下我主要会结构

    handlsers 存放类似flask的views

    models存数据库相关的,

    static存放静态

    template存放模板

    untils存放公共库

    setting 配置文件

    urls url地址映射

    run 运行文件。

     那么我来开始设计我的数据,其实我的数据模型也是一波三折的。

    选择用了sqlalchemy,之前用过flask的sqlalchemy感觉不错

    from models.dataconfig import db_session,Base,create_all
    from sqlalchemy import  Column,Integer,DateTime,Boolean,String,ForeignKey,desc,asc,Text
    from sqlalchemy.orm import  relationship,backref
    from untils.common import encrypt
    import datetime
    class User(Base):
    	__tablename__='users'
    	id=Column(Integer(),primary_key=True)
    	username=Column(String(64),unique=True,index=True)
    	email=Column(String(64))
    	password=Column(String(64))
    	last_logtime=Column(DateTime())
    	status=Column(Integer())
    	leves=Column(Integer())
    	iphone=Column(Integer())
    	Projects=relationship('Project',backref='users')
    	shebei=relationship('Shebei',backref='users')
    	file=relationship('FilePan',backref='users')
    	banben=relationship('BanbenWrite',backref='users')
    	testresult=relationship('TestResult',backref='users')
    	testcase=relationship('TestCase',backref='users')
    	buglog=relationship('BugLog',backref='users')
    	def __repr__(self):
    		return self.username
    	@classmethod
    	def get_by_id(cls, id):
    		item = db_session.query(User).filter(User.id==id).first()
    		return item
    	@classmethod
    	def get_by_username(cls, username):
    		item = db_session.query(User).filter(User.username== username).first()
    		return item
    	@classmethod
    	def get_count(cls):
    		return db_session.query(Shebei).count()
    	@classmethod
    	def add_new(cls,username,password,iphone,email,leves):
    		new=User(username=username,iphone=iphone,email=email,leves=leves)
    		new.password=encrypt(password)
    		new.status=0
    		db_session.add(new)
    		try:
    			db_session.commit()
    		except:
    			db_session.rollback()
    class Shebei(Base):
    	__tablename__='shebeis'
    	id=Column(Integer(),primary_key=True)
    	shebei_id=Column(String(32),unique=True)
    	shebei_name=Column(String(64))
    	shebei_xitong=Column(String(64))
    	shebei_xinghao=Column(String(255))
    	shebei_jiage=Column(Integer())
    	shebei_fapiaobianhao=Column(String(64))
    	shebei_quanxian=Column(Boolean())
    	shebei_jie=Column(Boolean())
    	shebei_shuyu=Column(String())
    	shebei_date=Column(DateTime())
    	shebei_user=Column(String())
    	gou_date=Column(DateTime())
    	shebei_status=Column(String(16))
    	she_sta=Column(Integer(),default=0)
    	ruku_user=Column(Integer(),ForeignKey('users.id'))
    	def __repr__(self):
    		return self.shebei_name
    	@classmethod
    	def get_by_name(cls,name):
    		item=db_session.query(Shebei).filter(Shebei.shebei_name==name).first()
    		return item
    	@classmethod
    	def get_by_id(cls,id):
    		item=db_session.query(Shebei).filter(Shebei.id==id).first()
    		return item
    	@classmethod
    	def get_count(cls):
    		return db_session.query(Shebei).count()
    class TestResult(Base):
    	__tablename__='testresults'
    	id=Column(Integer(),primary_key=True)
    	porject_id=Column(Integer(),ForeignKey('projects.id'))
    	creat_time=Column(DateTime())
    	bug_first=Column(Integer())
    	ceshirenyuan=Column(String(255))
    	is_send=Column(Boolean(),default=True)
    	filepath=Column(String(64))
    	status=Column(Integer(),default=0)
    	user_id=Column(Integer(),ForeignKey('users.id'))
    	def __repr__(self):
    		return self.porject_name
    	@classmethod
    	def get_by_name(cls,name):
    		item=db_session.query(TestResult).filter(TestResult.porject_name==name).first()
    		return item
    	@classmethod
    	def get_by_id(cls,id):
    		item=db_session.query(TestResult).filter(TestResult.id==id).first()
    		return item
    	@classmethod
    	def get_by_user_id(cls,user_id):
    		item=db_session.query(TestResult).filter(TestResult.user_id==user_id).first()
    		return item
    	@classmethod
    	def get_count(cls):
    		return db_session.query(TestResult).count()
    class BanbenWrite(Base):
    	__tablename__='banbens'
    	id=Column(Integer(),primary_key=True)
    	porject_id=Column(Integer(),ForeignKey('projects.id'))
    	creat_time=Column(DateTime(),default=datetime.datetime.now())
    	banbenhao=Column(String(32))
    	is_xian=Column(Boolean(),default=False)
    	is_test=Column(Boolean(),default=False)
    	status=Column(Integer())
    	user_id=Column(Integer(),ForeignKey('users.id'))
    	bugadmin=relationship('BugAdmin',backref='banbens')
    	def __repr__(self):
    		return self.banbenhao
    	@classmethod
    	def get_by_name(cls,name):
    		item=db_session.query(BanbenWrite).filter(BanbenWrite.porject_name==name).first()
    		return item
    	@classmethod
    	def get_by_id(cls,id):
    		item=db_session.query(BanbenWrite).filter(BanbenWrite.id==id).first()
    		return item
    	@classmethod
    	def get_by_user_id(cls,user_id):
    		item=db_session.query(BanbenWrite).filter(BanbenWrite.user_id==user_id).first()
    		return item
    	@classmethod
    	def get_count(cls):
    		return db_session.query(BanbenWrite).count()
    class FilePan(Base):
    	__tablename__='files'
    	id=Column(Integer(),primary_key=True)
    	file_fenlei=Column(String(64))
    	file_name=Column(String(64))
    	down_count=Column(Integer(),default=0)
    	creat_time=Column(DateTime(),default=datetime.datetime.now())
    	status=Column(Integer(),default=0)
    	down_url=Column(String(64))
    	is_tui=Column(Boolean(),default=False)
    	user_id=Column(Integer(),ForeignKey('users.id'))
    	def __repr__(self):
    		return self.file_name
    	@classmethod
    	def get_by_file_name(cls,name):
    		item=db_session.query(FilePan).filter(FilePan.file_name==name).first()
    		return item
    	@classmethod
    	def get_by_id(cls,id):
    		item=db_session.query(FilePan).filter(FilePan.id==id).first()
    		return item
    	@classmethod
    	def get_by_user_id(cls,user_id):
    		item=db_session.query(FilePan).filter(FilePan.user_id==user_id).first()
    		return item
    	@classmethod
    	def get_count(cls):
    		return db_session.query(FilePan).count()
    class BugAdmin(Base):
    	__tablename__='bugadmins'
    	id=Column(Integer(),primary_key=True)
    	porject_id=Column(Integer(),ForeignKey('projects.id'))
    	bugname=Column(String(64))
    	bugdengji=Column(String(64))
    	bugtime=Column(DateTime(),default=datetime.datetime.now())
    	bug_miaoshu=Column(String(255))
    	ban_id=Column(Integer(),ForeignKey('banbens.id'))
    	fujian=Column(String(64))
    	is_que=Column(Boolean())
    	bug_status=Column(String(64))
    	bug_jiejuefangan=Column(String(64))
    	bug_send=Column(String(64))
    	status=Column(Integer(),default=0)
    	bug_log=relationship('BugLog',backref='bugadmins')
    	user_id=Column(Integer(),ForeignKey('users.id'))
    	def __repr__(self):
    		return self.bugname
    	@classmethod
    	def get_by_bugname(cls,bugname):
    		item=db_session.query(BugAdmin).filter(BugAdmin.bugname==bugname).first()
    		return item
    	@classmethod
    	def get_by_id(cls,id):
    		item=db_session.query(BugAdmin).filter(BugAdmin.id==id).first()
    		return item
    	@classmethod
    	def get_by_porject_name(cls,porject_name):
    		item=db_session.query(BugAdmin).filter(BugAdmin.porject_name==porject_name).first()
    		return item
    	@classmethod
    	def get_count(cls):
    		return db_session.query(BugAdmin).count()
    class TestCase(Base):
    	__tablename__='testcases'
    	id=Column(Integer(),primary_key=True)
    	porject_id=Column(Integer(),ForeignKey('projects.id'))
    	casename=Column(String(64))
    	case_qianzhi=Column(String())
    	case_buzhou=Column(String())
    	case_yuqi=Column(String())
    	status=Column(Integer(),default=0)
    	case_crea_time=Column(DateTime(),default=datetime.datetime.now())
    	user_id=Column(Integer(),ForeignKey('users.id'))
    	def __repr__(self):
    		return self.casename
    	@classmethod
    	def get_by_project_name(Cls,project_name):
    		item=db_session.query(TestCase).filter(TestCase.project_name==project_name).first()
    		return item
    	@classmethod
    	def get_by_casename(Cls,casename):
    		item=db_session.query(TestCase).filter(TestCase.casename==casename).first()
    		return item
    	@classmethod
    	def get_by_id(cls,id):
    		item=db_session.query(TestCase).filter(TestCase.id==id).first()
    		return item
    	@classmethod
    	def get_count(cls):
    		return db_session.query(TestCase).count()
    class BugLog(Base):
    	__tablename__='buglogs'
    	id=Column(Integer(),primary_key=True)
    	bug_id=Column(Integer(),ForeignKey('bugadmins.id'))
    	caozuo=Column(String())
    	caozuo_time=Column(DateTime())
    	user_id=Column(Integer(),ForeignKey('users.id'))
    	def __repr__(self):
    		return self.caozuo
    	@classmethod
    	def get_by_id(Cls,id):
    		item=db_session.query(BugLog).filter(BugLog.id==id).first()
    		return item
    	@classmethod
    	def get_by_user_id(Cls,user_id):
    		item=db_session.query(BugLog).filter(BugLog.user_id==user_id).first()
    		return item
    	@classmethod
    	def get_by_bug_id(Cls,bug_id):
    		item=db_session.query(BugLog).filter(BugLog.bug_id==bug_id).first()
    		return item
    class Project(Base):
    	__tablename__='projects'
    	id=Column(Integer(),primary_key=True)
    	name=Column(String(64))
    	user_id=Column(Integer(),ForeignKey('users.id'))
    	bug_log=relationship('BugAdmin',backref='projects')
    	banben=relationship('BanbenWrite',backref='projects')
    	testresult=relationship('TestResult',backref='projects')
    	testcase=relationship('TestCase',backref='projects')
    	def __repr__(self):
    		return self.name
    	@classmethod
    	def get_by_id(cls,id):
    		item=db_session.query(Project).filter(Project.id==id).first()
    		return item 
    	@classmethod
    	def get_by_name(cls,name):
    		item=db_session.query(Project).filter(Project.name==name).first()
    		return item
    

     这是数据库相关的,

     1 数据库配置相关的
     2 
     3 from sqlalchemy import  create_engine
     4 from  sqlalchemy.orm import  scoped_session,sessionmaker
     5 from sqlalchemy.ext.declarative import  declarative_base
     6 engine=create_engine('sqlite:///shebei.db',convert_unicode=True)
     7 Base=declarative_base()
     8 db_session=scoped_session(sessionmaker(bind=engine))
     9 def create_all():
    10     Base.metadata.create_all(engine)
    11 def drop_all():
    12     Base.metadata.drop_all(engine)

    其实在开发的过程中,也遇到了很多阻力,比如下载一直实现不好,比如分页,也是参照别人的实现的,

    分页公共模块

    class Pagination:
        def __init__(self, current_page, all_item):
            try:
                page = int(current_page)
            except:
                page = 1
            if page < 1:
                page = 1
            all_pager, c = divmod(all_item, 10)
            if int(c) > 0:
                all_pager += 1
            self.current_page = page
            self.all_pager = all_pager
        @property
        def start(self):
            return (self.current_page - 1) * 10
        @property
        def end(self):
            return self.current_page * 10
        def string_pager(self, base_url="/index/"):
            if self.current_page == 1:
                prev = '<li><a href="javascript:void(0);">上一页</a></li>'
            else:
                prev = '<li><a href="%s%s">上一页</a></li>' % (base_url, self.current_page - 1,)
            if self.current_page == self.all_pager:
                nex = '<li><a href="javascript:void(0);">下一页</a></li>'
            else:
                nex = '<li><a href="%s%s">下一页</a></li>' % (base_url, self.current_page + 1,)
            last = '<li><a href="%s%s">尾页</a></li>' % (base_url, self.all_pager,)
            str_page = "".join((prev,nex,last))
            return str_page
    

    在上传文件的时候,原来存放在本地,结果呢,下载处理不好,于是乎选择了七牛,需要到七牛的官网去注册自己的账号

    from qiniu import Auth,put_file,etag,urlsafe_base64_encode
    import qiniu.config
    access_key='uVxowDUcYx641ivtUb111WBEI4112L3D117JHNM_AOtskRh4'
    secret_key='PdXU9XrXTLtp1N21bhU1Frm1FDZqE1qhjkEaE9d1xVLZ5C'
    def sendfile(key,file):
        q=Auth(access_key,secret_key)
        bucket_name='leilei22'
        token = q.upload_token(bucket_name, key)
        ret, info = put_file(token, key, file)
        me= ret['hash']
        f=etag(file)
        if me==f:
            assert_t=True
        else:
            assert_t=False
        return assert_t
    

     解析Excel,主要用于上传测试用例

    import xlrd,xlwt
    from xlutils.copy import copy
    def datacel(filepath):
        file=xlrd.open_workbook(filepath)
        me=file.sheets()[0]
        nrows=me.nrows
        porject_id_list=[]
        casename_list=[]
        case_qianzhi_list=[]
        case_buzhou_list=[]
        case_yuqi_list=[]
        for i in range(1,nrows):
            porject_id_list.append(me.cell(i,0).value)
            casename_list.append(me.cell(i,2).value)
            case_qianzhi_list.append(me.cell(i,3).value)
            case_buzhou_list.append(me.cell(i,4).value)
            case_yuqi_list.append(me.cell(i,1).value)
        return porject_id_list,casename_list,case_qianzhi_list,case_buzhou_list,case_yuqi_list

    其实这么现在公共模块完毕了,其实现在可以着手去开始写我们的代码了,主要的代码,还有静态界面,因为前后端都是我自己,我的前端其实还是从网上找来的模板,

    学习的道路是痛苦的,但是我相信我是可以成功的,

    from tornado.web import RequestHandler
    from models.model_py import User
    class BaseHandler(RequestHandler):
    	@property
    	def db(self):
    		return self.application.db
    	def get_current_user(self):
    		user_id = self.get_secure_cookie('user_id')
    		if not user_id:
    			return None
    		return User.get_by_id(int(user_id))
    

     基础的类集成了RequestHandler的类,,进行了一些简单的自定义,然后后续可以用这个,

    进过两周的开发,已经形成了成熟的,

  • 相关阅读:
    .net framework 3.5 beta 2 / vs 2008 beta 2 有问题!
    提交了 VS 2008 sp1 对 Linq to SQL 的 xml 字段类型支持的一个 bug
    如何在 vista 的 iis 7 上面配置 asp.net 1.1 开发环境
    Linq to sql 中如何进行 left join
    Silverlight 2 beta 2 中目前不支持共享 WCF 的客户端类型
    Scott Guthrie 写的 Silverlight 教程索引
    利用 Xml Literal 功能复制一段 Xml
    Silverlight 2 beta 2 bug 解决办法 (持续更新中)
    C++使用内存映射文件入门
    如何在C++项目中引用Lib文件(VS2005)
  • 原文地址:https://www.cnblogs.com/leiziv5/p/7532344.html
Copyright © 2011-2022 走看看