zoukankan      html  css  js  c++  java
  • python3+ros api

    官方文档:https://wiki.mikrotik.com/wiki/Manual:API_Python3



    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    # Author:lzd

    import sys, time, binascii, socket, select
    import hashlib


    class ApiRos:
    "Routeros api"

    def __init__(self, sk):
    self.sk = sk
    self.currenttag = 0

    def login(self, username, pwd):
    for repl, attrs in self.talk(["/login"]):
    chal = binascii.unhexlify((attrs['=ret']).encode('UTF-8'))
    md = hashlib.md5()
    md.update(b'x00')
    md.update(pwd.encode('UTF-8'))
    md.update(chal)
    self.talk(["/login", "=name=" + username,
    "=response=00" + binascii.hexlify(md.digest()).decode('UTF-8')])

    def talk(self, words):
    if self.writeSentence(words) == 0: return
    r = []
    while 1:
    i = self.readSentence();
    if len(i) == 0: continue
    reply = i[0]
    attrs = {}
    for w in i[1:]:
    j = w.find('=', 1)
    if (j == -1):
    attrs[w] = ''
    else:
    attrs[w[:j]] = w[j + 1:]
    r.append((reply, attrs))
    if reply == '!done': return r

    def writeSentence(self, words):
    ret = 0
    for w in words:
    self.writeWord(w)
    ret += 1
    self.writeWord('')
    return ret

    def readSentence(self):
    r = []
    while 1:
    w = self.readWord()
    if w == '': return r
    r.append(w)

    def writeWord(self, w):
    print(("<<< " + w))
    self.writeLen(len(w))
    self.writeStr(w)

    def readWord(self):
    ret = self.readStr(self.readLen())
    print((">>> " + ret))
    return ret

    def writeLen(self, l):
    if l < 0x80:
    self.writeStr(chr(l))
    elif l < 0x4000:
    l |= 0x8000
    self.writeStr(chr((l >> 8) & 0xFF))
    self.writeStr(chr(l & 0xFF))
    elif l < 0x200000:
    l |= 0xC00000
    self.writeStr(chr((l >> 16) & 0xFF))
    self.writeStr(chr((l >> 8) & 0xFF))
    self.writeStr(chr(l & 0xFF))
    elif l < 0x10000000:
    l |= 0xE0000000
    self.writeStr(chr((l >> 24) & 0xFF))
    self.writeStr(chr((l >> 16) & 0xFF))
    self.writeStr(chr((l >> 8) & 0xFF))
    self.writeStr(chr(l & 0xFF))
    else:
    self.writeStr(chr(0xF0))
    self.writeStr(chr((l >> 24) & 0xFF))
    self.writeStr(chr((l >> 16) & 0xFF))
    self.writeStr(chr((l >> 8) & 0xFF))
    self.writeStr(chr(l & 0xFF))

    def readLen(self):
    c = ord(self.readStr(1))
    if (c & 0x80) == 0x00:
    pass
    elif (c & 0xC0) == 0x80:
    c &= ~0xC0
    c <<= 8
    c += ord(self.readStr(1))
    elif (c & 0xE0) == 0xC0:
    c &= ~0xE0
    c <<= 8
    c += ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    elif (c & 0xF0) == 0xE0:
    c &= ~0xF0
    c <<= 8
    c += ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    elif (c & 0xF8) == 0xF0:
    c = ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    c <<= 8
    c += ord(self.readStr(1))
    return c

    def writeStr(self, str):
    n = 0;
    while n < len(str):
    r = self.sk.send(bytes(str[n:], 'UTF-8'))
    if r == 0: raise RuntimeError("connection closed by remote end")
    n += r

    def readStr(self, length):
    ret = ''
    while len(ret) < length:
    s = self.sk.recv(length - len(ret))
    if s == '': raise RuntimeError("connection closed by remote end")
    ret += s.decode('UTF-8', 'replace')
    return ret


    def main():
    s = None
    for res in socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
    af, socktype, proto, canonname, sa = res
    try:
    s = socket.socket(af, socktype, proto)
    except (socket.error, msg):
    s = None
    continue
    try:
    s.connect(sa)
    except (socket.error, msg):
    s.close()
    s = None
    continue
    break
    if s is None:
    print('could not open socket')
    sys.exit(1)

    apiros = ApiRos(s);
    apiros.login(sys.argv[2], sys.argv[3]);

    #模拟用户自己输入代码进行api控制
    tmpcommand=input('请输入你想查询的命令')
    inputsentence = []
    inputsentence.append(tmpcommand)
    apiros.writeSentence(inputsentence)
    while 1:
    x = apiros.readSentence()
    #print(x)
    if x == ['!done'] or x==['!re', '=status=finished']:
    break
    #导出ros配置到ROS本地ghg1.rsc文件
    # inputsentence = []
    #
    # inputsentence.append('/export')
    # inputsentence.append('=file=ghg1.rsc')
    # apiros.writeSentence(inputsentence)
    # while 1:
    # x = apiros.readSentence()
    # #print(x)
    # if x == ['!done'] or x==['!re', '=status=finished']:
    # break
    #从ros上传文件到ftp的代码
    # inputsentence = []
    # inputsentence.append('/tool/fetch')
    # inputsentence.append('=address=192.168.0.108')
    # inputsentence.append('=src-path=ghg1.rsc')
    # inputsentence.append('=user=xxxxx')
    # inputsentence.append('=mode=ftp')
    # inputsentence.append('=password=xxxxx')
    # inputsentence.append('=dst-path=123.rsc')
    # inputsentence.append('=upload=yes')
    # apiros.writeSentence(inputsentence)
    # inputsentence = []
    # while 1:
    # x = apiros.readSentence()
    # #print(x)
    # if x == ['!done'] or x==['!re', '=status=finished']:
    # break
    #删除文件代码
    # inputsentence = []
    # inputsentence.append('/file/remove')
    # inputsentence.append('=numbers=ghg1.rsc')
    # apiros.writeSentence(inputsentence)
    # while 1:
    # x = apiros.readSentence()
    # print(x)
    # if x == ['!done'] or x==['!re', '=status=finished']:
    # break
    #官方循环代码,等待你输入命令行,可以用来测试代码命令行
    # while 1:
    # r = select.select([s, sys.stdin], [], [], None)
    # if s in r[0]:
    # # something to read in socket, read sentence
    # x = apiros.readSentence()
    #
    # if sys.stdin in r[0]:
    # # read line from input and strip off newline
    # l = sys.stdin.readline()
    # print(l)
    # l = l[:-1]
    # print(l)
    #
    #
    # # if empty line, send sentence and start with new
    # # otherwise append to input sentence
    # if l == '':
    # apiros.writeSentence(inputsentence)
    # inputsentence = []
    # else:
    # inputsentence.append(l)


    if __name__ == '__main__':
    main()


    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    # Author:lzd
    
    import sys, time, binascii, socket, select
    import hashlib
    
    
    class ApiRos:
        "Routeros api"
    
        def __init__(self, sk):
            self.sk = sk
            self.currenttag = 0
    
        def login(self, username, pwd):
            for repl, attrs in self.talk(["/login"]):
                chal = binascii.unhexlify((attrs['=ret']).encode('UTF-8'))
            md = hashlib.md5()
            md.update(b'x00')
            md.update(pwd.encode('UTF-8'))
            md.update(chal)
            self.talk(["/login", "=name=" + username,
                       "=response=00" + binascii.hexlify(md.digest()).decode('UTF-8')])
    
        def talk(self, words):
            if self.writeSentence(words) == 0: return
            r = []
            while 1:
                i = self.readSentence();
                if len(i) == 0: continue
                reply = i[0]
                attrs = {}
                for w in i[1:]:
                    j = w.find('=', 1)
                    if (j == -1):
                        attrs[w] = ''
                    else:
                        attrs[w[:j]] = w[j + 1:]
                r.append((reply, attrs))
                if reply == '!done': return r
    
        def writeSentence(self, words):
            ret = 0
            for w in words:
                self.writeWord(w)
                ret += 1
            self.writeWord('')
            return ret
    
        def readSentence(self):
            r = []
            while 1:
                w = self.readWord()
                if w == '': return r
                r.append(w)
    
        def writeWord(self, w):
            print(("<<< " + w))
            self.writeLen(len(w))
            self.writeStr(w)
    
        def readWord(self):
            ret = self.readStr(self.readLen())
            print((">>> " + ret))
            return ret
    
        def writeLen(self, l):
            if l < 0x80:
                self.writeStr(chr(l))
            elif l < 0x4000:
                l |= 0x8000
                self.writeStr(chr((l >> 8) & 0xFF))
                self.writeStr(chr(l & 0xFF))
            elif l < 0x200000:
                l |= 0xC00000
                self.writeStr(chr((l >> 16) & 0xFF))
                self.writeStr(chr((l >> 8) & 0xFF))
                self.writeStr(chr(l & 0xFF))
            elif l < 0x10000000:
                l |= 0xE0000000
                self.writeStr(chr((l >> 24) & 0xFF))
                self.writeStr(chr((l >> 16) & 0xFF))
                self.writeStr(chr((l >> 8) & 0xFF))
                self.writeStr(chr(l & 0xFF))
            else:
                self.writeStr(chr(0xF0))
                self.writeStr(chr((l >> 24) & 0xFF))
                self.writeStr(chr((l >> 16) & 0xFF))
                self.writeStr(chr((l >> 8) & 0xFF))
                self.writeStr(chr(l & 0xFF))
    
        def readLen(self):
            c = ord(self.readStr(1))
            if (c & 0x80) == 0x00:
                pass
            elif (c & 0xC0) == 0x80:
                c &= ~0xC0
                c <<= 8
                c += ord(self.readStr(1))
            elif (c & 0xE0) == 0xC0:
                c &= ~0xE0
                c <<= 8
                c += ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
            elif (c & 0xF0) == 0xE0:
                c &= ~0xF0
                c <<= 8
                c += ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
            elif (c & 0xF8) == 0xF0:
                c = ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
                c <<= 8
                c += ord(self.readStr(1))
            return c
    
        def writeStr(self, str):
            n = 0;
            while n < len(str):
                r = self.sk.send(bytes(str[n:], 'UTF-8'))
                if r == 0: raise RuntimeError("connection closed by remote end")
                n += r
    
        def readStr(self, length):
            ret = ''
            while len(ret) < length:
                s = self.sk.recv(length - len(ret))
                if s == '': raise RuntimeError("connection closed by remote end")
                ret += s.decode('UTF-8', 'replace')
            return ret
    
    
    def main():
        s = None
        for res in socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
            af, socktype, proto, canonname, sa = res
            try:
                s = socket.socket(af, socktype, proto)
            except (socket.error, msg):
                s = None
                continue
            try:
                s.connect(sa)
            except (socket.error, msg):
                s.close()
                s = None
                continue
            break
        if s is None:
            print('could not open socket')
            sys.exit(1)
    
        apiros = ApiRos(s);
        apiros.login(sys.argv[2], sys.argv[3]);
    
        #模拟用户自己输入代码进行api控制
        tmpcommand=input('请输入你想查询的命令')
        inputsentence = []
        inputsentence.append(tmpcommand)
        apiros.writeSentence(inputsentence)
        while 1:
            x = apiros.readSentence()
            #print(x)
            if x == ['!done'] or x==['!re', '=status=finished']:
                break
        #导出ros配置到ROS本地ghg1.rsc文件
        # inputsentence = []
        #
        # inputsentence.append('/export')
        # inputsentence.append('=file=ghg1.rsc')
        # apiros.writeSentence(inputsentence)
        # while 1:
        #     x = apiros.readSentence()
        #     #print(x)
        #     if x == ['!done'] or x==['!re', '=status=finished']:
        #         break
        #从ros上传文件到ftp的代码
        # inputsentence = []
        # inputsentence.append('/tool/fetch')
        # inputsentence.append('=address=192.168.0.108')
        # inputsentence.append('=src-path=ghg1.rsc')
        # inputsentence.append('=user=xxxxx')
        # inputsentence.append('=mode=ftp')
        # inputsentence.append('=password=xxxxx')
        # inputsentence.append('=dst-path=123.rsc')
        # inputsentence.append('=upload=yes')
        # apiros.writeSentence(inputsentence)
        # inputsentence = []
        # while 1:
        #     x = apiros.readSentence()
        #     #print(x)
        #     if x == ['!done'] or x==['!re', '=status=finished']:
        #         break
        #删除文件代码
        # inputsentence = []
        # inputsentence.append('/file/remove')
        # inputsentence.append('=numbers=ghg1.rsc')
        # apiros.writeSentence(inputsentence)
        # while 1:
        #     x = apiros.readSentence()
        #     print(x)
        #     if x == ['!done'] or x==['!re', '=status=finished']:
        #         break
                #官方循环代码,等待你输入命令行,可以用来测试代码命令行
                # while 1:
                #     r = select.select([s, sys.stdin], [], [], None)
                #     if s in r[0]:
                #         # something to read in socket, read sentence
                #         x = apiros.readSentence()
                #
                #     if sys.stdin in r[0]:
                #         # read line from input and strip off newline
                #         l = sys.stdin.readline()
                #         print(l)
                #         l = l[:-1]
                #         print(l)
                #
                #
                #         # if empty line, send sentence and start with new
                #         # otherwise append to input sentence
                #         if l == '':
                #             apiros.writeSentence(inputsentence)
                #             inputsentence = []
                #         else:
                #             inputsentence.append(l)
    
    
    if __name__ == '__main__':
        main()
  • 相关阅读:
    leetcode Super Ugly Number
    leetcode Find Median from Data Stream
    leetcode Remove Invalid Parentheses
    leetcode Range Sum Query
    leetcode Range Sum Query
    leetcode Minimum Height Trees
    hdu 3836 Equivalent Sets
    hdu 1269 迷宫城堡
    hud 2586 How far away ?
    poj 1330 Nearest Common Ancestors
  • 原文地址:https://www.cnblogs.com/itfat/p/7433794.html
Copyright © 2011-2022 走看看