zoukankan      html  css  js  c++  java
  • Python 调用 Hprose接口、Dubbo接口、Java方法

      1 #!/usr/bin/env python
      2 # -*- coding:utf-8 -*-
      3 # *************************************
      4 # @Time    : 2019/7/1
      5 # @Author  : Zhang Fan
      6 # @Desc    : Library
      7 # @File    : MyTools.py
      8 # @Update  : 2019/8/23
      9 # *************************************
     10 import telnetlib
     11 import hprose
     12 import jpype
     13 import json
     14 import os
     15 
     16 
     17 class MyHprose(object):
     18     """
     19     ===================================================================
     20     =====================       MyHprose        =======================
     21     ===================================================================
     22     """
     23     def __init__(self):
     24         self.client = None
     25         self.base_url = None
     26 
     27     def create_http_client(self, address, url):
     28         """
     29         创建HTTP客户端连接
     30         """
     31         self.base_url = 'http://' + address + url
     32         print('Creating Session : %s' % self.base_url)
     33         self.client = hprose.HproseHttpClient(self.base_url)
     34 
     35     def set_header(self, key, value):
     36         """
     37         设置连接头
     38         """
     39         print('Set Header : { "%s": "%s" }' % (key, value))
     40         self.client.setHeader(key, value)
     41 
     42     def set_parameter(self, *args):
     43         """
     44         设置参数值
     45         """
     46         arg_list = list()
     47         for arg in args:
     48             if isinstance(arg, str):
     49                 try:
     50                     arg = eval(arg)
     51                 except Exception as e:
     52                     logger.error(e)
     53             arg_list.append(arg)
     54         print('Set Parameter : %s' % str(arg_list))
     55         return arg_list
     56 
     57     def invoke_method(self, method, args):
     58         """
     59         调用方法
     60         """
     61         result = {
     62             'type': None,
     63             'msg': None,
     64             'value': None
     65         }
     66         print('Invoke Method : method={0}, args={1}'.format(method, args))
     67         ret = self.client.invoke(method, args)
     68         result['type'] = ret.types
     69         result['msg'] = ret.msg
     70         result['value'] = ret.value
     71         return json.dumps(result, ensure_ascii=False)
     72 
     73 
     74 class MyJpype(object):
     75     """
     76     ===================================================================
     77     =====================       MyJpype        ========================
     78     ===================================================================
     79     """
     80     def __init__(self):
     81         pass
     82 
     83     def start_jvm(self, jvmpath, jarpath):
     84         """
     85         开启java虚拟机.
     86         """
     87         print('Executing : Start JVM | java -ea -Djava.class.path=JedisSerialize.jar')
     88         jarpath = os.path.join(os.path.abspath(jarpath), "JedisSerialize.jar")
     89         jpype.startJVM(jvmpath, "-ea", "-Djava.class.path=%s" % jarpath, convertStrings=False)
     90 
     91     def save_token_jvm(self, db, host, port, auth):
     92         """
     93         调用java保存token到内存库.
     94         """
     95         JDClass = jpype.JClass("com.util.SearchRedis")
     96         jd = JDClass()
     97         jd.saveToken(int(db), str(host), int(port), str(auth))
     98 
     99     def get_redis_jvm(self, key, db, host, port, auth):
    100         """
    101         调用java查询String类型内存库.
    102         """
    103         print('Executing : Query Key[%s] From Select[%s]' % (key, db))
    104         JDClass = jpype.JClass("com.util.SearchRedis")
    105         jd = JDClass()
    106         result = jd.getbit(str(key), int(db), str(host), int(port), str(auth))
    107         try:
    108             return eval(str(result))
    109         except:
    110             return json.loads(str(result))
    111 
    112     def get_hash_redis_jvm(self, key, db, host, port, auth):
    113         """
    114         调用java查询Hash类型内存库.
    115         """
    116         print('Executing : Query Key[%s] From Select[%s]' % (key, db))
    117         JDClass = jpype.JClass("com.util.SearchRedis")
    118         jd = JDClass()
    119         result = jd.gethashbit(str(key), int(db), str(host), int(port), str(auth))
    120         try:
    121             return eval(str(result))
    122         except:
    123             return json.loads(str(result))
    124 
    125     def shutdown_jvm(self):
    126         """
    127         关闭java虚拟机.
    128         """
    129         print('Executing : Shutdown JVM')
    130         jpype.shutdownJVM()
    131 
    132 
    133 class MyDubbo(telnetlib.Telnet):
    134     """
    135     ===================================================================
    136     =====================       MyDubbo        =========================
    137     ===================================================================
    138     """
    139 
    140     def __init__(self, host=None, port=0):
    141         # super().__init__(host, port)
    142         super(MyDubbo, self).__init__(host, port)
    143         self.write(b"
    ")
    144 
    145     def command(self, flag, str_=""):
    146         data = self.read_until(flag.encode())
    147         self.write(str_.encode() + b"
    ")
    148         return data
    149 
    150     def invoke(self, service_name, method_name, arg):
    151         command_str = "invoke {0}.{1}({2})".format(service_name, method_name, arg)
    152         print('dubbo>:%s' % command_str)
    153         self.command('dubbo>', command_str)
    154         data = self.command('dubbo>', "
    ")
    155         new_data = data.decode('utf-8', errors='ignore').split('
    ')[0].strip()
    156         return json.loads(new_data)
    157 
    158 
    159 if __name__ == '__main__':
    160     print('This is test.')
    161     mh = MyHprose()
    162     mj = MyJpype()
    163     md = MyDubbo()
    164  
  • 相关阅读:
    c语言结构体数组引用
    c语言结构体数组定义的三种方式
    如何为SAP WebIDE开发扩展(Extension),并部署到SAP云平台上
    SAP SRM ABAP Webdynpro和CFCA usb key集成的一个原型开发
    使用SAP API portal进行SAP SuccessFactors的API测试
    SAP UI5应用里的页面路由处理
    在SAP WebIDE Database Explorer里操作hdi实例
    如何使用SAP事务码SAT进行UI应用的性能分析
    使用SAP WebIDE进行SAP Cloud Platform Business Application开发
    SAP CRM WebClient UI ON_NEW_FOCUS的用途
  • 原文地址:https://www.cnblogs.com/leozhanggg/p/11401519.html
Copyright © 2011-2022 走看看