zoukankan      html  css  js  c++  java
  • python3+ros api

    官方文档:https://wiki.mikrotik.com/wiki/Manual:API_Python3



    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    # Author:lzd

    import sys, time, binascii, socket, select
    import hashlib


    class ApiRos:
    "Routeros api"

    def __init__(self, sk):
    self.sk = sk
    self.currenttag = 0

    def login(self, username, pwd):
    for repl, attrs in self.talk(["/login"]):
    chal = binascii.unhexlify((attrs['=ret']).encode('UTF-8'))
    md = hashlib.md5()
    md.update(b'x00')
    md.update(pwd.encode('UTF-8'))
    md.update(chal)
    self.talk(["/login", "=name=" + username,
    "=response=00" + binascii.hexlify(md.digest()).decode('UTF-8')])

    def talk(self, words):
    if self.writeSentence(words) == 0: return
    r = []
    while 1:
    i = self.readSentence();
    if len(i) == 0: continue
    reply = i[0]
    attrs = {}
    for w in i[1:]:
    j = w.find('=', 1)
    if (j == -1):
    attrs[w] = ''
    else:
    attrs[w[:j]] = w[j + 1:]
    r.append((reply, attrs))
    if reply == '!done': return r

    def writeSentence(self, words):
    ret = 0
    for w in words:
    self.writeWord(w)
    ret += 1
    self.writeWord('')
    return ret

    def readSentence(self):
    r = []
    while 1:
    w = self.readWord()
    if w == '': return r
    r.append(w)

    def writeWord(self, w):
    print(("<<< " + w))
    self.writeLen(len(w))
    self.writeStr(w)

    def readWord(self):
    ret = self.readStr(self.readLen())
    print((">>> " + ret))
    return ret

    def writeLen(self, l):
    if l < 0x80:
    self.writeStr(chr(l))
    elif l < 0x4000:
    l |= 0x8000
    self.writeStr(chr((l >> 8) & 0xFF))
    self.writeStr(chr(l & 0xFF))
    elif l < 0x200000:
    l |= 0xC00000
    self.writeStr(chr((l >> 16) & 0xFF))
    self.writeStr(chr((l >> 8) & 0xFF))
    self.writeStr(chr(l & 0xFF))
    elif l < 0x10000000:
    l |= 0xE0000000
    self.writeStr(chr((l >> 24) & 0xFF))
    self.writeStr(chr((l >> 16) & 0xFF))
    self.writeStr(chr((l >> 8) & 0xFF))
    self.writeStr(chr(l & 0xFF))
    else:
    self.writeStr(chr(0xF0))
    self.writeStr(chr((l >> 24) & 0xFF))
    self.writeStr(chr((l >> 16) & 0xFF))
    self.writeStr(chr((l >> 8) & 0xFF))
    self.writeStr(chr(l & 0xFF))

    def readLen(self):
    c = ord(self.readStr(1))
    if (c & 0x80) == 0x00:
    pass
    elif (c & 0xC0) == 0x80:
    c &= ~0xC0
    c <<= 8
    c += ord(self.readStr(1))
    elif (c & 0xE0) == 0xC0:
    c &= ~0xE0
    c <<= 8
    c += ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    elif (c & 0xF0) == 0xE0:
    c &= ~0xF0
    c <<= 8
    c += ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    elif (c & 0xF8) == 0xF0:
    c = ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    return c

    def writeStr(self, str):
    n = 0;
    while n < len(str):
    r = self.sk.send(bytes(str[n:], 'UTF-8'))
    if r == 0: raise RuntimeError("connection closed by remote end")
    n += r

    def readStr(self, length):
    ret = ''
    while len(ret) < length:
    s = self.sk.recv(length - len(ret))
    if s == '': raise RuntimeError("connection closed by remote end")
    ret += s.decode('UTF-8', 'replace')
    return ret


    def main():
    s = None
    for res in socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
    af, socktype, proto, canonname, sa = res
    try:
    s = socket.socket(af, socktype, proto)
    except (socket.error, msg):
    s = None
    continue
    try:
    s.connect(sa)
    except (socket.error, msg):
    s.close()
    s = None
    continue
    break
    if s is None:
    print('could not open socket')
    sys.exit(1)

    apiros = ApiRos(s);
    apiros.login(sys.argv[2], sys.argv[3]);

    #模拟用户自己输入代码进行api控制
    tmpcommand=input('请输入你想查询的命令')
    inputsentence = []
    inputsentence.append(tmpcommand)
    apiros.writeSentence(inputsentence)
    while 1:
    x = apiros.readSentence()
    #print(x)
    if x == ['!done'] or x==['!re', '=status=finished']:
    break
    #导出ros配置到ROS本地ghg1.rsc文件
    # inputsentence = []
    #
    # inputsentence.append('/export')
    # inputsentence.append('=file=ghg1.rsc')
    # apiros.writeSentence(inputsentence)
    # while 1:
    # x = apiros.readSentence()
    # #print(x)
    # if x == ['!done'] or x==['!re', '=status=finished']:
    # break
    #从ros上传文件到ftp的代码
    # inputsentence = []
    # inputsentence.append('/tool/fetch')
    # inputsentence.append('=address=192.168.0.108')
    # inputsentence.append('=src-path=ghg1.rsc')
    # inputsentence.append('=user=xxxxx')
    # inputsentence.append('=mode=ftp')
    # inputsentence.append('=password=xxxxx')
    # inputsentence.append('=dst-path=123.rsc')
    # inputsentence.append('=upload=yes')
    # apiros.writeSentence(inputsentence)
    # inputsentence = []
    # while 1:
    # x = apiros.readSentence()
    # #print(x)
    # if x == ['!done'] or x==['!re', '=status=finished']:
    # break
    #删除文件代码
    # inputsentence = []
    # inputsentence.append('/file/remove')
    # inputsentence.append('=numbers=ghg1.rsc')
    # apiros.writeSentence(inputsentence)
    # while 1:
    # x = apiros.readSentence()
    # print(x)
    # if x == ['!done'] or x==['!re', '=status=finished']:
    # break
    #官方循环代码,等待你输入命令行,可以用来测试代码命令行
    # while 1:
    # r = select.select([s, sys.stdin], [], [], None)
    # if s in r[0]:
    # # something to read in socket, read sentence
    # x = apiros.readSentence()
    #
    # if sys.stdin in r[0]:
    # # read line from input and strip off newline
    # l = sys.stdin.readline()
    # print(l)
    # l = l[:-1]
    # print(l)
    #
    #
    # # if empty line, send sentence and start with new
    # # otherwise append to input sentence
    # if l == '':
    # apiros.writeSentence(inputsentence)
    # inputsentence = []
    # else:
    # inputsentence.append(l)


    if __name__ == '__main__':
    main()


    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    # Author:lzd
    
    import sys, time, binascii, socket, select
    import hashlib
    
    
    class ApiRos:
        "Routeros api"
    
        def __init__(self, sk):
            self.sk = sk
            self.currenttag = 0
    
        def login(self, username, pwd):
            for repl, attrs in self.talk(["/login"]):
                chal = binascii.unhexlify((attrs['=ret']).encode('UTF-8'))
            md = hashlib.md5()
            md.update(b'x00')
            md.update(pwd.encode('UTF-8'))
            md.update(chal)
            self.talk(["/login", "=name=" + username,
                       "=response=00" + binascii.hexlify(md.digest()).decode('UTF-8')])
    
        def talk(self, words):
            if self.writeSentence(words) == 0: return
            r = []
            while 1:
                i = self.readSentence();
                if len(i) == 0: continue
                reply = i[0]
                attrs = {}
                for w in i[1:]:
                    j = w.find('=', 1)
                    if (j == -1):
                        attrs[w] = ''
                    else:
                        attrs[w[:j]] = w[j + 1:]
                r.append((reply, attrs))
                if reply == '!done': return r
    
        def writeSentence(self, words):
            ret = 0
            for w in words:
                self.writeWord(w)
                ret += 1
            self.writeWord('')
            return ret
    
        def readSentence(self):
            r = []
            while 1:
                w = self.readWord()
                if w == '': return r
                r.append(w)
    
        def writeWord(self, w):
            print(("<<< " + w))
            self.writeLen(len(w))
            self.writeStr(w)
    
        def readWord(self):
            ret = self.readStr(self.readLen())
            print((">>> " + ret))
            return ret
    
        def writeLen(self, l):
            if l < 0x80:
                self.writeStr(chr(l))
            elif l < 0x4000:
                l |= 0x8000
                self.writeStr(chr((l >> 8) & 0xFF))
                self.writeStr(chr(l & 0xFF))
            elif l < 0x200000:
                l |= 0xC00000
                self.writeStr(chr((l >> 16) & 0xFF))
                self.writeStr(chr((l >> 8) & 0xFF))
                self.writeStr(chr(l & 0xFF))
            elif l < 0x10000000:
                l |= 0xE0000000
                self.writeStr(chr((l >> 24) & 0xFF))
                self.writeStr(chr((l >> 16) & 0xFF))
                self.writeStr(chr((l >> 8) & 0xFF))
                self.writeStr(chr(l & 0xFF))
            else:
                self.writeStr(chr(0xF0))
                self.writeStr(chr((l >> 24) & 0xFF))
                self.writeStr(chr((l >> 16) & 0xFF))
                self.writeStr(chr((l >> 8) & 0xFF))
                self.writeStr(chr(l & 0xFF))
    
        def readLen(self):
            c = ord(self.readStr(1))
            if (c & 0x80) == 0x00:
                pass
            elif (c & 0xC0) == 0x80:
                c &= ~0xC0
                c <<= 8
                c += ord(self.readStr(1))
            elif (c & 0xE0) == 0xC0:
                c &= ~0xE0
                c <<= 8
                c += ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
            elif (c & 0xF0) == 0xE0:
                c &= ~0xF0
                c <<= 8
                c += ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
            elif (c & 0xF8) == 0xF0:
                c = ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
            return c
    
        def writeStr(self, str):
            n = 0;
            while n < len(str):
                r = self.sk.send(bytes(str[n:], 'UTF-8'))
                if r == 0: raise RuntimeError("connection closed by remote end")
                n += r
    
        def readStr(self, length):
            ret = ''
            while len(ret) < length:
                s = self.sk.recv(length - len(ret))
                if s == '': raise RuntimeError("connection closed by remote end")
                ret += s.decode('UTF-8', 'replace')
            return ret
    
    
    def main():
        s = None
        for res in socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
            af, socktype, proto, canonname, sa = res
            try:
                s = socket.socket(af, socktype, proto)
            except (socket.error, msg):
                s = None
                continue
            try:
                s.connect(sa)
            except (socket.error, msg):
                s.close()
                s = None
                continue
            break
        if s is None:
            print('could not open socket')
            sys.exit(1)
    
        apiros = ApiRos(s);
        apiros.login(sys.argv[2], sys.argv[3]);
    
        #模拟用户自己输入代码进行api控制
        tmpcommand=input('请输入你想查询的命令')
        inputsentence = []
        inputsentence.append(tmpcommand)
        apiros.writeSentence(inputsentence)
        while 1:
            x = apiros.readSentence()
            #print(x)
            if x == ['!done'] or x==['!re', '=status=finished']:
                break
        #导出ros配置到ROS本地ghg1.rsc文件
        # inputsentence = []
        #
        # inputsentence.append('/export')
        # inputsentence.append('=file=ghg1.rsc')
        # apiros.writeSentence(inputsentence)
        # while 1:
        #     x = apiros.readSentence()
        #     #print(x)
        #     if x == ['!done'] or x==['!re', '=status=finished']:
        #         break
        #从ros上传文件到ftp的代码
        # inputsentence = []
        # inputsentence.append('/tool/fetch')
        # inputsentence.append('=address=192.168.0.108')
        # inputsentence.append('=src-path=ghg1.rsc')
        # inputsentence.append('=user=xxxxx')
        # inputsentence.append('=mode=ftp')
        # inputsentence.append('=password=xxxxx')
        # inputsentence.append('=dst-path=123.rsc')
        # inputsentence.append('=upload=yes')
        # apiros.writeSentence(inputsentence)
        # inputsentence = []
        # while 1:
        #     x = apiros.readSentence()
        #     #print(x)
        #     if x == ['!done'] or x==['!re', '=status=finished']:
        #         break
        #删除文件代码
        # inputsentence = []
        # inputsentence.append('/file/remove')
        # inputsentence.append('=numbers=ghg1.rsc')
        # apiros.writeSentence(inputsentence)
        # while 1:
        #     x = apiros.readSentence()
        #     print(x)
        #     if x == ['!done'] or x==['!re', '=status=finished']:
        #         break
                #官方循环代码,等待你输入命令行,可以用来测试代码命令行
                # while 1:
                #     r = select.select([s, sys.stdin], [], [], None)
                #     if s in r[0]:
                #         # something to read in socket, read sentence
                #         x = apiros.readSentence()
                #
                #     if sys.stdin in r[0]:
                #         # read line from input and strip off newline
                #         l = sys.stdin.readline()
                #         print(l)
                #         l = l[:-1]
                #         print(l)
                #
                #
                #         # if empty line, send sentence and start with new
                #         # otherwise append to input sentence
                #         if l == '':
                #             apiros.writeSentence(inputsentence)
                #             inputsentence = []
                #         else:
                #             inputsentence.append(l)
    
    
    if __name__ == '__main__':
        main()
  • 相关阅读:
    Cmake Make makefile GNU autotools
    第三方库的安装:Pangolin
    ./configure, make, sudo make install 的含义
    [Eigen]C++开源线代库
    术语解释
    KDevelop使用笔记【中文教程】
    Python-Day1
    找不到或无法加载主类
    仅仅测试Word2016发布博客
    First Day!
  • 原文地址:https://www.cnblogs.com/itfat/p/7433794.html
Copyright © 2011-2022 走看看