zoukankan      html  css  js  c++  java
  • python从zk获取连接并测试dubbo接口

    #coding=utf8
    import sys
    from kazoo.client import KazooClient
    import urllib
    import json
    import telnetlib
    import socket
    import dubbo_telnet
    import unittest
    from HTMLTestRunner import HTMLTestRunner


    Host = '' # Doubble服务器IP
    Port = # Doubble服务端口
    tenant_id=""

    def coondoubble_data(interface,method,param):
    try:
    global Host,Port
    # 初始化dubbo对象
    conn = dubbo_telnet.connect(Host, Port)
    # 设置telnet连接超时时间
    conn.set_connect_timeout(10)
    # 设置dubbo服务返回响应的编码

    conn.set_encoding('utf8')
    conn.invoke(interface, method, param)
    # print conn.do("ls %s"%(interface) )
    command = 'invoke %s.%s("%s")'%(interface,method,param)
    # print command
    return conn.do(command)
    except Exception as e:
    return e


    def get_dubbo():
    global Host
    zk = KazooClient(hosts="{}:2181".format(Host))
    zk.start()
    urls = []
    list = zk.get_children("dubbo")
    for i in list:
    # print i
    if 'pacific' in i:
    # print i
    gg = zk.get_children("/dubbo/{}/consumers".format(i))
    if gg:
    for j in gg:
    url = urllib.unquote(j)
    if url.startswith('dubbo:'):
    urls.append(url.split('?')[0].split('dubbo://')[1])
    gg = zk.get_children("/dubbo/{}/providers".format(i))
    if gg:
    for j in gg:
    url = urllib.unquote(j)
    if url.startswith('dubbo:'):
    urls.append(url.split('?')[0].split('dubbo://')[1])
    services = {}
    for i in urls:
    path, service = i.split('/')
    if not services.get(path):
    services.update({path: []})
    services[path].append(service)
    print json.dumps(services, indent=4)


    class ModuleServiceTest(unittest.TestCase):
    @classmethod
    def setUpClass(self):
    global tenant_id
    self.interface = 'uyun.pacific.model.api.service.ModelService'
    self.tenant_id = tenant_id

    def test_getModelVersion(self):
    method = 'getModelVersion'
    data = coondoubble_data(self.interface, method, self.tenant_id)
    self.assertNotEqual(data['versionNum'], '')

    def test_checkAndUpgrade(self):
    method = 'checkAndUpgrade'
    data = coondoubble_data(self.interface, method, self.tenant_id)
    self.assertEqual(isinstance(data,bool),True)

    def test_(self):
    u''''''

    if __name__ == '__main__':
    get_dubbo()

    testsuite = unittest.TestSuite()
    testsuite.addTest(unittest.TestLoader().loadTestsFromTestCase(ModuleServiceTest))
    with open('test_store_dubbo.html', 'wb') as f:
    runner = HTMLTestRunner(stream=f,
    verbosity=2,
    title='store-res-dubbo接口测试报告'.decode('utf8'),
    description='store-res-dubbo接口测试报告'.decode('utf8'))
    runner.run(testsuite) # 运行所有的测试用例

  • 相关阅读:
    VSCODE打开一个文件,另一个文件就关闭的问题的解决方法
    elementui的el-tree第一次加载无法展开和选中的问题
    Java线程知识:二、锁的简单使用
    “商家参数格式有误”应用切微信H5支付完美解决方案
    git 基础操作,公私钥认证/ssh公私钥登录
    Python数据分析之亚马逊股价
    Python分析6000家破产IT公司
    Python数据分析之股票数据
    Python数据分析之全球人口数据
    Vue 面试重点真题演练
  • 原文地址:https://www.cnblogs.com/slqt/p/11347232.html
Copyright © 2011-2022 走看看