zoukankan      html  css  js  c++  java
  • libreoffice python 操作word及excel文档

    1、开始、关闭libreoffice服务;

    开始之前同步字体文件时间,是因为创建soffice服务时,服务会检查所需加载的文件的时间,如果其认为时间不符,则其可能会重新加载,耗时较长,因此需事先统一时间。

    使用时如果需要多次调用,最后每次调用均开启后关闭,否则libreoffice会创建一个缓存文档并越用越大,处理时间会增加。

    class OfficeProcess(object):
        def __init__(self):
            self.p = 0
            subprocess.Popen('find /usr/share/fonts | xargs touch -m -t 201801010000.00', shell=True)
    
        def start_office(self):
            self.p = subprocess.Popen('soffice --pidfile=sof.pid --invisible --accept="socket,host=localhost,port=2002;urp;"', shell=True)
            while True:
                try:
                    local_context = uno.getComponentContext()
                    resolver = local_context.getServiceManager().createInstanceWithContext('com.sun.star.bridge.UnoUrlResolver', local_context)
                    resolver.resolve('uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext')
                    return
                except:
                    print(ts(), "wait for connecting soffice...")
                    time.sleep(1)
                    continue
    
        def stop_office(self):
            with open("sof.pid", "rb") as f:
                try:
                    os.kill(int(f.read()), signal.SIGTERM)
                    self.p.wait()
                except:
                    pass

    2、init service manager

    local_context = uno.getComponentContext()
            service_manager = local_context.getServiceManager()
            resolver = service_manager.createInstanceWithContext('com.sun.star.bridge.UnoUrlResolver', local_context)
            self.ctx = resolver.resolve('uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext')
            self.smgr = self.ctx.ServiceManager
            self.desktop = self.smgr.createInstanceWithContext('com.sun.star.frame.Desktop', self.ctx)

    3、从二进制数据中读取doc文档

    def ImportFromMemory(self, data):
            istream = self.smgr.createInstanceWithContext('com.sun.star.io.SequenceInputStream', self.ctx)
            istream.initialize((uno.ByteSequence(data), ))
            pv = PropertyValue()
            pv.Name = 'InputStream'
            pv.Value = istream
            self.doc = {'doc': []}
            try:
                self.document = self.desktop.loadComponentFromURL('private:stream/swriter', '_blank', 0, (pv, ))
                self.text = self.document.getText()
            except:
                self.text = None

    4、读取doc文档中的数据

    def ExportToJson(self):
            try:
                l = self.__ParseText(self.text, self.__Callback(self.doc['doc']))
                self.doc['length'] = l
            except:
                self.doc = {'doc': [], 'length': 0}
            return json.dumps(self.doc)
    
    @staticmethod
        def __Callback(alist):
            def Append(sth):
                alist.append(sth)
            return Append
    def __ParseText(self, text, func):
            l = 0
            text_it = text.createEnumeration()
            while text_it.hasMoreElements():
                element = text_it.nextElement()
                if element.supportsService('com.sun.star.text.Paragraph'):
                    l += self.__ParseParagraph(element, func)
                elif element.supportsService('com.sun.star.text.TextTable'):
                    l += self.__ParseTable(element, func)
                else:
                    pass
            return l
    def __ParseParagraph(self, paragraph, func):
            p = {'paragraph': []}
            l = 0
            paragraph_it = paragraph.createEnumeration()
            while paragraph_it.hasMoreElements():
                portion = paragraph_it.nextElement()
                if portion.TextPortionType == 'Text':
                    l += self.__ParsePortionText(portion, self.__Callback(p['paragraph']))
                elif portion.TextPortionType == 'SoftPageBreak':
                    pass
                elif portion.TextPortionType == 'TextField':
                    l += self.__ParsePortionText(portion, self.__Callback(p['paragraph']))
                else:
                    l += self.__ParseTextContent(portion, self.__Callback(p['paragraph']))
            if hasattr(paragraph, 'createContentEnumeration'):
                l += self.__ParseTextContent(paragraph, self.__Callback(p['paragraph']))
            p['length'] = l
            func(p)
            return l
    
        def __ParseTextContent(self, textcontent, func):
            l = 0
            content_it = textcontent.createContentEnumeration('com.sun.star.text.TextContent')
            while content_it.hasMoreElements():
                element = content_it.nextElement()
                if element.supportsService('com.sun.star.text.TextGraphicObject'):
                    l += self.__ParsePortionGraphic(element, func)
                elif element.supportsService('com.sun.star.text.TextEmbeddedObject'):
                    pass
                elif element.supportsService('com.sun.star.text.TextFrame'):
                    l += self.__ParseFrame(element, func)
                elif element.supportsService('com.sun.star.drawing.GroupShape'):
                    l += self.__ParseGroup(element, func)
                else:
                    pass
            return l
    
        def __ParseFrame(self, frame, func):
            f = {'frame': []}
            l = self.__ParseText(frame.getText(), self.__Callback(f['frame']))
            f['length'] = l
            func(f)
            return l
    
        def __ParseGroup(self, group, func):
            l = 0
            for i in range(group.getCount()):
                it = group.getByIndex(i)
                if it.supportsService('com.sun.star.drawing.Text'):
                    l += self.__ParseFrame(it, func)
                else:
                    pass
            return l
    
        def __ParsePortionText(self, portion_text, func):
            func({'portion': portion_text.String, 'length': len(portion_text.String)})
            return len(portion_text.String)
    
        def __ParsePortionGraphic(self, portion_graphic, func):
            gp = self.smgr.createInstanceWithContext('com.sun.star.graphic.GraphicProvider', self.ctx)
            stream = self.smgr.createInstanceWithContext('com.sun.star.io.TempFile', self.ctx)
            pv1 = PropertyValue()
            pv1.Name = 'OutputStream'
            pv1.Value = stream
            pv2 = PropertyValue()
            pv2.Name = 'MimeType'
            pv2.Value = 'image/png'
            gp.storeGraphic(portion_graphic.Graphic, (pv1, pv2))
            stream.getOutputStream().flush()
            stream.seek(0)
            l = stream.getInputStream().available()
            b = uno.ByteSequence(b'')
            stream.seek(0)
            l, b = stream.getInputStream().readBytes(b, l)
            img = {'image': base64.b64encode(b.value).decode('ascii')}
            img['height'] = portion_graphic.Height
            img['width'] = portion_graphic.Width
            img['actualheight'] = portion_graphic.ActualSize.Height
            img['actualwidth'] = portion_graphic.ActualSize.Width
            img['croptop'] = portion_graphic.GraphicCrop.Top
            img['cropbottom'] = portion_graphic.GraphicCrop.Bottom
            img['cropleft'] = portion_graphic.GraphicCrop.Left
            img['cropright'] = portion_graphic.GraphicCrop.Right
            img['length'] = 0
            func(img)
            return 0
    
        def __ParseTable(self, table, func):
            l = 0
            try:
                matrix = self.__GetTableMatrix(table)
                seps = self.__GetTableSeparators(table)
                t = {}
                count = 0
                for ri in matrix.keys():
                    t[ri] = {}
                    for ci in matrix[ri].keys():
                        t[ri][ci] = dict(matrix[ri][ci])
                        del t[ri][ci]['cell']
                        t[ri][ci]['content'] = []
                        l += self.__ParseText(matrix[ri][ci]['cell'], self.__Callback(t[ri][ci]['content']))
                        count += t[ri][ci]['rowspan'] * t[ri][ci]['colspan']
                if count != len(t) * len(seps):
                    raise ValueError('count of cells error')
                func({'table': t, 'row': len(t), 'column': len(seps), 'length': l, 'tableid': self.table_id})
                self.table_id += 1
            except:
                l = 0
                print('discard wrong table')
            return l
    
        @staticmethod
        def __GetTableSeparators(table):
            result = [table.TableColumnRelativeSum]
            for ri in range(table.getRows().getCount()):
                result += [s.Position for s in table.getRows().getByIndex(ri).TableColumnSeparators]
            result = sorted(set(result))
            for i in range(len(result) - 1):
                result[i] += 1 if result[i] + 1 == result[i + 1] else 0
            return sorted(set(result))
    
        @staticmethod
        def __NameToRC(name):
            r = int(re.sub('[A-Za-z]', '', name)) - 1
            cstr = re.sub('[0-9]', '', name)
            c = 0
            for i in range(len(cstr)):
                if cstr[i] >= 'A' and cstr[i] <= 'Z':
                    c = c * 52 + ord(cstr[i]) - ord('A')
                else:
                    c = c * 52 + 26 + ord(cstr[i]) - ord('a')
            return r, c
    
        @staticmethod
        def __GetTableMatrix(table):
            result = {}
            for name in table.getCellNames():
                ri, ci = WordToJson.__NameToRC(name)
                cell = table.getCellByName(name)
                if ri not in result:
                    result[ri] = {}
                result[ri][ci] = {'cell': cell, 'rowspan': cell.RowSpan, 'name': name}
    
            seps = WordToJson.__GetTableSeparators(table)
            for ri in result.keys():
                sep = [s.Position for s in table.getRows().getByIndex(ri).TableColumnSeparators] + [table.TableColumnRelativeSum]
                sep = sorted(set(sep))
                for ci in result[ri].keys():
                    right = seps.index(sep[ci]) if sep[ci] in seps else seps.index(sep[ci] + 1)
                    left = -1 if ci == 0 else seps.index(sep[ci - 1]) if sep[ci - 1] in seps else seps.index(sep[ci - 1] + 1)
                    result[ri][ci]['colspan'] = right - left
            return result

    5、写doc文档

    self.doco = self.desktop.loadComponentFromURL('private:factory/swriter', '_blank', 0, ())
            self.texto = self.doco.getText()
            self.cursoro = self.texto.createTextCursor()
            self.cursoro.ParaBottomMargin = 500
    def __WriteText(self, text, texto, cursoro):
            for it in text:
                if 'paragraph' in it:
                    self.__WriteParagraph(it, texto, cursoro)
                elif 'image' in it:
                    self.__WritePortionGraphic(it, texto, cursoro)
                elif 'table' in it:
                    self.__WriteTable(it, texto, cursoro)
    
        def __WriteParagraph(self, paragraph, texto, cursoro):
            if paragraph['length'] > 0:
                if 'result' in paragraph:
                    for it in paragraph['result']:
                        texto.insertString(cursoro, it['trans_sen'], False)
                else:
                    texto.insertString(cursoro, paragraph['paragraph'], False)
                texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False)
    
        def __WritePortionGraphic(self, portion_graphic, texto, cursoro):
            png_base64 = portion_graphic['image']
            png = base64.b64decode(png_base64)
            gp = self.smgr.createInstanceWithContext('com.sun.star.graphic.GraphicProvider', self.ctx)
            istream = self.smgr.createInstanceWithContext('com.sun.star.io.SequenceInputStream', self.ctx)
            istream.initialize((uno.ByteSequence(png), ))
            pv = PropertyValue()
            pv.Name = 'InputStream'
            pv.Value = istream
    
            actualsize = uno.createUnoStruct('com.sun.star.awt.Size')
            actualsize.Height = portion_graphic['actualheight'] if 'actualheight' in portion_graphic else portion_graphic['height']
            actualsize.Width = portion_graphic['actualwidth'] if 'actualwidth' in portion_graphic else portion_graphic['width']
            graphiccrop = uno.createUnoStruct('com.sun.star.text.GraphicCrop')
            graphiccrop.Top = portion_graphic['croptop'] if 'croptop' in portion_graphic else 0
            graphiccrop.Bottom = portion_graphic['cropbottom'] if 'cropbottom' in portion_graphic else 0
            graphiccrop.Left = portion_graphic['cropleft'] if 'cropleft' in portion_graphic else 0
            graphiccrop.Right = portion_graphic['cropright'] if 'cropright' in portion_graphic else 0
    
            image = self.doco.createInstance('com.sun.star.text.TextGraphicObject')
            image.Surround = NONE
            image.Graphic = gp.queryGraphic((pv, ))
            image.Height = portion_graphic['height']
            image.Width = portion_graphic['width']
            image.setPropertyValue('ActualSize', actualsize)
            image.setPropertyValue('GraphicCrop', graphiccrop)
            texto.insertTextContent(cursoro, image, False)
            texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False)
    
        def __WriteTable(self, table, texto, cursoro):
            tableo = self.doco.createInstance('com.sun.star.text.TextTable')
            tableo.initialize(table['row'], table['column'])
            texto.insertTextContent(cursoro, tableo, False)
    #        texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False)
            tcursoro = tableo.createCursorByCellName("A1")
    
            hitbug = False
            if table['row'] > 1:
                tcursoro.goDown(1, True)
                hitbug = tcursoro.getRangeName() == 'A1'
    
            for ri in sorted([int(r) for r in table['table'].keys()]):
                rs = table['table'][str(ri)]
                for ci in sorted([int(c) for c in rs.keys()]):
                    cell = rs[str(ci)]
                    if hitbug == False and (cell['rowspan'] > 1 or cell['colspan'] > 1):
                        tcursoro.gotoCellByName(cell['name'], False)
                        if cell['rowspan'] > 1:
                            tcursoro.goDown(cell['rowspan'] - 1, True)
                        if cell['colspan'] > 1:
                            tcursoro.goRight(cell['colspan'] - 1, True)
                        tcursoro.mergeRange()
                    ctexto = tableo.getCellByName(cell['name'])
                    if ctexto == None:
                        continue
                    ccursoro = ctexto.createTextCursor()
                    ccursoro.CharWeight = FontWeight.NORMAL
                    ccursoro.CharWeightAsian = FontWeight.NORMAL
                    ccursoro.ParaAdjust = LEFT
                    self.__WriteText(cell['content'], ctexto, ccursoro)

    6、生成二进制的doc文档数据

            streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
            self.doco.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'MS Word 2007 XML', 0), PropertyValue('OutputStream', 0, streamo, 0)))
            streamo.flush()
            _, datao = streamo.readBytes(None, streamo.available())

    7、从doc文档数据生成pdf的二进制数据

            streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
            self.doco.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'writer_pdf_Export', 0), PropertyValue('OutputStream', 0, streamo, 0)))
            streamo.flush()
            _, datap = streamo.readBytes(None, streamo.available())

    8、读取excel二进制数据

      def ImportFromMemory(self, data):
            istream = self.smgr.createInstanceWithContext('com.sun.star.io.SequenceInputStream', self.ctx)
            istream.initialize((uno.ByteSequence(data), ))
            pv = PropertyValue()
            pv.Name = 'InputStream'
            pv.Value = istream
            self.doc = {'doc': []}
            try:
                print("before loadComponentFromURL")
                self.document = self.desktop.loadComponentFromURL('private:stream/scalc', '_blank', 0, (pv, ))
                self.sheets = self.document.getSheets()
                print("ImportFromMemory done")
            except:
                print("ImportFromMemory failed")
                self.sheets = None

    9、读取excel的文本数据

        def ExportToJson(self):
            try:
                l = self.__ParseText(self.sheets, self.__Callback(self.doc['doc']))
                self.doc['length'] = l
            except:
                self.doc = {'doc': [], 'length': 0}
            return json.dumps(self.doc)
        def __ParseText(self, sheets, func):
            l = 0
            sheets_it = sheets.createEnumeration()
            while sheets_it.hasMoreElements():
                element = sheets_it.nextElement()
                if element.supportsService('com.sun.star.sheet.Spreadsheet'):
                    l += self.__ParseSpreadsheet(element, func)
            return l
    
        def __ParseSpreadsheet(self, spreadsheet, func):
            l = 0
            p = {'spreadsheet': []}
            visible_cells_it = spreadsheet.queryVisibleCells().getCells().createEnumeration()
            while visible_cells_it.hasMoreElements():
                cell = visible_cells_it.nextElement()
                type = cell.getType()
                if type == self.EMPTY:
                    print("cell.type==empty")
                elif type == self.VALUE:
                    print("cell.type==VALUE", "value=", cell.getValue(), cell.getCellAddress ())
                elif type == self.TEXT:
                    print("cell.type==TEXT","content=", cell.getString().encode("UTF-8"), cell.getCellAddress ())
                    l += self.__ParseCellText(spreadsheet, cell, self.__Callback(p['spreadsheet']))
                    print("__ParseCellText=", p)
                elif type == self.FORMULA:
                    print("cell.type==FORMULA", "formula=", cell.getValue())
            p['length'] = l
            func(p)
            return l
    
        def __ParseCellText(self, sheet, cell, func):
            try:
                x = cell.getCellAddress().Column
                y = cell.getCellAddress().Row
                sheetname = sheet.getName()
            except:
                x = -1
                y = -1
                sheetname = None
            func({'celltext': cell.getString(), 'x': x, 'y': y, 'sheetname': sheetname, 'length': len(cell.getString())})
            return len(cell.getString())
         self.EMPTY = uno.Enum("com.sun.star.table.CellContentType", "EMPTY")
            self.TEXT = uno.Enum("com.sun.star.table.CellContentType", "TEXT")
            self.FORMULA = uno.Enum("com.sun.star.table.CellContentType", "FORMULA")
            self.VALUE = uno.Enum("com.sun.star.table.CellContentType", "VALUE")

    10、替换excel的文本信息

        def ImportFromJson(self, data):
            doc = json.loads(data)
            try:
                self.__WriteText(doc['doc'])
            except:
                pass
        def __WriteText(self, text):
            print("__WriteText begin:", text)
            sheet = None
            for it in text:
                if 'paragraph' in it and 'sheetname' in it:
                    if sheet == None or sheet.getName() != it['sheetname']:
                        try:
                            sheet = self.sheets.getByName(it['sheetname'])
                            print("getsheet:", it['sheetname'], "=", sheet.getName())
                        except:
                            sheet = None
                            continue
                    self.__WriteParagraph(it, sheet)
    
        def __WriteParagraph(self, paragraph, sheet):
            print("__WriteParagraph")
            if paragraph['length'] > 0:
                try:
                    x = paragraph['x']
                    y = paragraph['y']
                    print("getcell:", x, y)
                    cell = sheet.getCellByPosition(x, y)
                    print("getcell done")
                except:
                    return
                if 'result' in paragraph:
                    for it in paragraph['result']:
                        print("cell=", cell.getString())
                        cell.setString(it['trans_sen'])
                        print("cell,", cell.getString(), ",done")

    11、生成excel文档二进制数据

          streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
            self.document.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'Calc MS Excel 2007 XML', 0), PropertyValue('OutputStream', 0, streamo, 0)))
            streamo.flush()
            _, datao = streamo.readBytes(None, streamo.available())

    12、生成excel的pdf文档

            streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
            self.document.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'calc_pdf_Export', 0), PropertyValue('OutputStream', 0, streamo, 0)))
            streamo.flush()
            _, datap = streamo.readBytes(None, streamo.available())
  • 相关阅读:
    [.net 面向对象程序设计进阶] (16) 多线程(Multithreading)(一) 利用多线程提高程序性能(上)
    [.net 面向对象程序设计进阶] (15) 缓存(Cache)(二) 利用缓存提升程序性能
    [.net 面向对象程序设计进阶] (14) 缓存(Cache) (一) 认识缓存技术
    [.net 面向对象程序设计进阶] (13) 序列化(Serialization)(五) Json 序列化利器 Newtonsoft.Json 及 通用Json类
    [.net 面向对象程序设计进阶] (12) 序列化(Serialization)(四) 快速掌握JSON的序列化和反序列化
    [.net 面向对象程序设计进阶] (11) 序列化(Serialization)(三) 通过接口 IXmlSerializable 实现XML序列化 及 通用XML类
    [.net 面向对象程序设计进阶] (10) 序列化(Serialization)(二) 通过序列化博客园文章学习XML的序列化
    [.net 面向对象程序设计进阶] (9) 序列化(Serialization) (一) 二进制流序列化
    [.net 面向对象程序设计进阶] (8) 托管与非托管
    [.net 面向对象程序设计进阶] (7) Lamda表达式(三) 表达式树高级应用
  • 原文地址:https://www.cnblogs.com/zl1991/p/10615881.html
Copyright © 2011-2022 走看看