zoukankan      html  css  js  c++  java
  • LongRunningTasks

    Introduction

    The following was posted to the comp.lang.python newsgroup on 24-May-2001. Since it explains very well the options for keeping the GUI responsive when the application has to do long running tasks I decided to copy it here verbatim. Many thanks to David Bolen for taking the time and effort to compose and post this message.

    --Robin

     

    The Question

    Daniel Frame writes:

    • Basically, I've constructed a GUI with wxPython which runs a very long function when a 'START' button is pressed. Of course, the GUI is 'locked up' until this function completes itself. I was wanting to add a 'STOP' button to my application which would halt this other function if the user got tired of waiting for it to complete. Is it possible to accomplish this without resorting to using threads? If so, How can I have my running function check occasionally to see if the stop button was pressed? I've heard of wxYield, but I can't seem to implement it properly.

     

    David's Response

    There's really three possibilities I would think of - threading, wxYield, or chunking up your processing in a wxEVT_IDLE handler. I've given a small sample of each below.

    I personally like threads for this sort of thing (I think they leave the UI the most responsive), but there's no hard and fast rule. So whichever you're more comfortable with.

    Threading really isn't that hard - you can just start up a thread to do your processing, and have it send an event to your main GUI thread when it has completed. While it is working, it can check an event object or some other flag variable to indicate that it should give up and stop.

    For a simple sort of example, here's a small bit of code that presents a simple (really dumb visually) frame, and uses a worker thread to simulate some processing (that takes 10s, resulting in a value of 10), while permitting it to be aborted. This could be extrapolated to doing any sort of lengthy processing and returning any sort of result. Intermediate events could be generated during the processing to give some indication to the main GUI thread of how things were proceeding (perhaps updating something like a gauge or some other visual indicator).

     

    切换行号显示
    import time
    from threading import *
    import wx
    
    # Button definitions
    ID_START = wx.NewId()
    ID_STOP = wx.NewId()
    
    # Define notification event for thread completion
    EVT_RESULT_ID = wx.NewId()
    
    def EVT_RESULT(win, func):
        """Define Result Event."""
        win.Connect(-1, -1, EVT_RESULT_ID, func)
    
    class ResultEvent(wx.PyEvent):
        """Simple event to carry arbitrary result data."""
        def __init__(self, data):
            """Init Result Event."""
            wx.PyEvent.__init__(self)
            self.SetEventType(EVT_RESULT_ID)
            self.data = data
    
    # Thread class that executes processing
    class WorkerThread(Thread):
        """Worker Thread Class."""
        def __init__(self, notify_window):
            """Init Worker Thread Class."""
            Thread.__init__(self)
            self._notify_window = notify_window
            self._want_abort = 0
            # This starts the thread running on creation, but you could
            # also make the GUI thread responsible for calling this
            self.start()
    
        def run(self):
            """Run Worker Thread."""
            # This is the code executing in the new thread. Simulation of
            # a long process (well, 10s here) as a simple loop - you will
            # need to structure your processing so that you periodically
            # peek at the abort variable
            for i in range(10):
                time.sleep(1)
                if self._want_abort:
                    # Use a result of None to acknowledge the abort (of
                    # course you can use whatever you'd like or even
                    # a separate event type)
                    wx.PostEvent(self._notify_window, ResultEvent(None))
                    return
            # Here's where the result would be returned (this is an
            # example fixed result of the number 10, but it could be
            # any Python object)
            wx.PostEvent(self._notify_window, ResultEvent(10))
    
        def abort(self):
            """abort worker thread."""
            # Method for use by main thread to signal an abort
            self._want_abort = 1
    
    # GUI Frame class that spins off the worker thread
    class MainFrame(wx.Frame):
        """Class MainFrame."""
        def __init__(self, parent, id):
            """Create the MainFrame."""
            wx.Frame.__init__(self, parent, id, 'Thread Test')
    
            # Dumb sample frame with two buttons
            wx.Button(self, ID_START, 'Start', pos=(0,0))
            wx.Button(self, ID_STOP, 'Stop', pos=(0,50))
            self.status = wx.StaticText(self, -1, '', pos=(0,100))
    
            self.Bind(wx.EVT_BUTTON, self.OnStart, id=ID_START)
            self.Bind(wx.EVT_BUTTON, self.OnStop, id=ID_STOP)
    
            # Set up event handler for any worker thread results
            EVT_RESULT(self,self.OnResult)
    
            # And indicate we don't have a worker thread yet
            self.worker = None
    
        def OnStart(self, event):
            """Start Computation."""
            # Trigger the worker thread unless it's already busy
            if not self.worker:
                self.status.SetLabel('Starting computation')
                self.worker = WorkerThread(self)
    
        def OnStop(self, event):
            """Stop Computation."""
            # Flag the worker thread to stop if running
            if self.worker:
                self.status.SetLabel('Trying to abort computation')
                self.worker.abort()
    
        def OnResult(self, event):
            """Show Result status."""
            if event.data is None:
                # Thread aborted (using our convention of None return)
                self.status.SetLabel('Computation aborted')
            else:
                # Process results here
                self.status.SetLabel('Computation Result: %s' % event.data)
            # In either event, the worker is done
            self.worker = None
    
    class MainApp(wx.App):
        """Class Main App."""
        def OnInit(self):
            """Init Main App."""
            self.frame = MainFrame(None, -1)
            self.frame.Show(True)
            self.SetTopWindow(self.frame)
            return True
    
    if __name__ == '__main__':
        app = MainApp(0)
        app.MainLoop()
    

    Oh, and if you're concerned with hanging on an exit if your thread doesn't terminate for some reason, just add a "self.setDaemon(1)" to the init and Python won't wait for it to terminate.

    The second approach, using wxYield, should be fine too - just add a call to wxYield() somewhere within the computation code such that it executes periodically. At that point, any pending window events will be dispatched (permitting the window to refresh, process button presses, etc...). Then, it's similar to the above in that you set a flag so that when the original code gets control after the wxYield() returns it knows to stop processing.

    As with the threading case, since all events go through during the wxYield() you need to protect against trying to run the same operation twice.

    Here's the equivalent of the above but placing the computation right inside the main window class. Note that one difference is that unlike with threading, the responsiveness of your GUI is now directly related to how frequently you call wxYield, so you may have delays refreshing your window dependent on that frequency. You should notice that this is a bit more sluggish with its frequency of a wxYield() each second.

     

    切换行号显示
    import time
    import wx
    
    # Button definitions
    ID_START = wx.NewId()
    ID_STOP = wx.NewId()
    
    # GUI Frame class that spins off the worker thread
    class MainFrame(wx.Frame):
        """Class MainFrame."""
        def __init__(self, parent, id):
            """Create the MainFrame."""
            wx.Frame.__init__(self, parent, id, 'wxYield Test')
    
            # Dumb sample frame with two buttons
            wx.Button(self, ID_START, 'Start', pos=(0,0))
            wx.Button(self, ID_STOP, 'Stop', pos=(0,50))
            self.status = wx.StaticText(self, -1, '', pos=(0,100))
    
            self.Bind (wx.EVT_BUTTON, self.OnStart, id=ID_START)
            self.Bind (wx.EVT_BUTTON, self.OnStop, id=ID_STOP)
    
            # Indicate we aren't working on it yet
            self.working = 0
    
        def OnStart(self, event):
            """Start Computation."""
            # Start the processing - this simulates a loop - you need to call
            # wx.Yield at some periodic interval.
            if not self.working:
                self.status.SetLabel('Starting Computation')
                self.working = 1
                self.need_abort = 0
    
                for i in range(10):
                    time.sleep(1)
                    wx.Yield()
                    if self.need_abort:
                        self.status.SetLabel('Computation aborted')
                        break
                else:
                    # Here's where you would process the result
                    # Note you should only do this if not aborted.
                    self.status.SetLabel('Computation Completed')
    
                # In either event, we aren't running any more
                self.working = 0
    
        def OnStop(self, event):
            """Stop Computation."""
            if self.working:
                self.status.SetLabel('Trying to abort computation')
                self.need_abort = 1
    
    class MainApp(wx.App):
        """Class Main App."""
        def OnInit(self):
            """Init Main App."""
            self.frame = MainFrame(None,-1)
            self.frame.Show(True)
            self.SetTopWindow(self.frame)
            return True
    
    if __name__ == '__main__':
        app = MainApp(0)
        app.MainLoop()
    

    And finally, you can do your work within an idle handler. In this case, you let wxPython generate an IDLE event whenever it has completed processing normal user events, and then you perform a "chunk" of your processing in each such case. This can be a little tricker depending on your algorithm since you have to be able to perform the work in discrete pieces. Inside your IDLE handler, you request that it be called again if you aren't done, but you want to make sure that each pass through the handler doesn't take too long.Effectively, each event is similar to the gap between wxYield() calls in the previous example, and your GUI responsiveness will be subject to that latency just as with the wxYield() case.

    I'm also not sure you can remove an idle handler once established (or at least I think I had problems with that in the past), so the code below just establishes it once and the handler only does work if it's in the midst of a computation. [Actually, you can use the Disconnect method to remove an event handler binding, although there is no real need to do so as there is very little overhead if you use a guard condition as in the code below. --Robin]

     

    切换行号显示
    import time
    import wx
    
    # Button definitions
    ID_START = wx.NewId()
    ID_STOP = wx.NewId()
    
    # GUI Frame class that spins off the worker thread
    class MainFrame(wx.Frame):
        """Class MainFrame."""
        def __init__(self, parent, id):
            """Create the MainFrame."""
            wx.Frame.__init__(self, parent, id, 'Idle Test')
    
            # Dumb sample frame with two buttons
            wx.Button(self, ID_START, 'Start',p os=(0,0))
            wx.Button(self, ID_STOP, 'Stop', pos=(0,50))
            self.status = wx.StaticText(self, -1, '', pos=(0,100))
    
            self.Bind (wx.EVT_BUTTON, self.OnStart, id=ID_START)
            self.Bind (wx.EVT_BUTTON, self.OnStop, id=ID_STOP)
            self.Bind (wx.EVT_IDLE, self.OnIdle)
    
            # Indicate we aren't working on it yet
            self.working = 0
    
        def OnStart(self, event):
            """Start Computation."""
            # Set up for processing and trigger idle event
            if not self.working:
                self.status.SetLabel('Starting Computation')
                self.count = 0
                self.working = 1
                self.need_abort = 0
    
        def OnIdle(self, event):
            """Idle Handler."""
            if self.working:
                # This is where the processing takes place, one bit at a time
                if self.need_abort:
                    self.status.SetLabel('Computation aborted')
                else:
                    self.count = self.count + 1
                    time.sleep(1)
                    if self.count < 10:
                        # Still more work to do so request another event
                        event.RequestMore()
                        return
                    else:
                        self.status.SetLabel('Computation completed')
    
                # Reaching here is an abort or completion - end in either case
                self.working = 0
    
        def OnStop(self, event):
            """Stop Computation."""
            if self.working:
                self.status.SetLabel('Trying to abort computation')
                self.need_abort = 1
    
    class MainApp(wx.App):
        """Class Main App."""
        def OnInit(self):
            """Init Main App."""
            self.frame = MainFrame(None, -1)
            self.frame.Show(True)
            self.SetTopWindow(self.frame)
            return True
    
    if __name__ == '__main__':
        app = MainApp(0)
        app.MainLoop()
    

     


     

     

    Slight modification for recursive tasks

    Jeff Grimmett adds:

    The above three examples got me going in the right direction, but the actual worker task used is a little on the simple side for what I needed to do, which was to recurse through directories and use the same class for each recursion.

    Do I need to tell you that it's a bad idea to recurse 100 threads at once? :-) Highly entertaining to watch but hardly useful of the end goal is to collect data serially!

    The trick I settled on was to seperate the thread from the task. The means of communicating the shutdown command was the first thing I tackled. I settled on the python threading module's Event class. This class is basically a flag, which i suppose would also work quite well as long as the scope is correct.

     

    切换行号显示
    import  threading
    
    # This is a global flag that we will manipulate as needed
    # to allow graceful exit from the recursive search.
    KeepRunning     =       threading.Event()
    

    The next thing I needed to do was define the scope of the actual worker thread.

     

    切换行号显示
    class   Worker(threading.Thread):
      def __init__ (self, op, dir1, dir2):
      threading.Thread.__init__(self)
      self.op   = op
      self.Dir1 = dir1
      self.Dir2 = dir2
      KeepRunning.set()
    
      self.start()
    
      def run(self):
        self.wb = WorkerBee(self.op, self.Dir1, self.Dir2)
        self.op.AppendText('Done!
    ')
        wxBell()
    
        # Assuming you are following the above example somewhat, assume that this is a
        # similar 'reporting' event class that we are calling. It carries a cargo which
        # is in fact the 'head' WorkerBee.
        wxPostEvent(self.op.GetParent(), NewReport(self.wb))
    
      def abort(self):
        KeepRunning.clear()
    
      # print to the output window.
      def Update(self, txt):
        self.op.AppendText(txt)
        self.op.ShowPosition(self.op.GetLastPosition()) # keeps the last line visible
    

    The thread initializes with a pointer to the output window - which in this case is a wxTextCtrl() object - and the starting directory. Like the above example, this thread is self-starting, but need not be.

    The first big difference is in the run() method. In the above example all the work is actually done BY the run() method. In this case, we create a WorkerBee object and retain a handle on it. Exactly why will become clear in a moment.

    The second difference is that instead of keeping our 'keep running' variable local, we now have it global in the form of the KeepGoing Event object. Again, the reason why becomes clear in the next part.

    While not directly on topic, the Update() method is simply a convenience to deal with the output display. If you were going to stdout this would be entirely uncessesary.

    ====

    The WorkerBee class is the main point of difference between this example and the previous. Instead of the thread object doing all the work, it starts up the WorkerBee object.

    The WorkerBee class recursively scans two directories and compares the results using the filecmp Python module. The exact task is not important, but I had to use something :-)

     

    切换行号显示
    class WorkerBee:
      def __init__ (self, op, dir1, dir2):
        self.DiffList = []  # We will retain a list of changed files
        self.DirList  = []  # We will retain a list of directories.
        self.A        = dir1
        self.B        = dir2
        self.op       = op
    
        self.Update('scanning %s
    ' % self.A)
    
        self.cmp = filecmp.dircmp(self.A, self.B)
    
        self.Deleted = self.cmp.left_only
        self.New     = self.cmp.right_only
        self.Files   = self.cmp.common_files
        self.Dirs    = self.cmp.common_dirs
    
        for i in self.Files :
         if not KeepRunning.isSet(): break
    
           self.Update('	%s\%s' %(self.A,i))
    
           if filecmp.cmp('%s\%s' % (self.A, i), '%s\%s' % (self.B, i), shallow=0) == 0 :
             self.Update('	<---- DIFF ***
    ')      # A diff!
             self.DiffList.append(i)
           else:
             self.Update('
    ')
    
        for i in self.Dirs  :
          if not KeepRunning.isSet(): break
    
          self.DirList.append ( WorkerBee ( op,
                                            '%s\%s' % (self.A, i),
                                            '%s\%s' % (self.B, i)
                                          )
                              )
    
      def   Update(self, txt):
        self.op.AppendText(txt)
        self.op.ShowPosition(self.op.GetLastPosition())
    

    THIS is why we need to keep the thread object and the workerbee seperate. If the workerbee was built around the Thread class, we would have a SWARM of workerbees! However, in this case what we end up with is a controlled recursion into a directory tree, with frequent checks on the KeepRunning Event flag. The whole thing shuts down quite nicely and leaves the GUI VERY responsive.

     

    More Tips

    ChuckEsterbrook asked:

    The LongRunningTasks wiki page doesn't mention using Queues. When people use this technique what is the set up? For example, is your "worker thread" pushing data into the queue and then a wx.Timer is polling it out with a non-blocking q.get_nowait()?

    RobinDunn replied:

    That's one way. Another is to call wxWakeUpIdle when the item is put in the Queue, and then have a EVT_IDLE handler that fetches items from the queue. That way the item is usually processed very soon after it is put there without having to have a higher frequency timer.

    Another option is to not use the queue or wxPostEvent and just use wxCallAfter passing to it the callable and parameters to be called in the GUI thread. (It uses wxPostEvent internally.) The implementation and docs of wxCallAfter is in core.py.

    MarienZwart would like to add:

    This is actually (sort of) documented in the wxwidgets docs:

    • For communication between secondary threads and the main thread, you may use wxEvtHandler::AddPendingEvent or its short version wxPostEvent. These functions have a thread-safe implementation so that they can be used as they are for sending events from one thread to another. However there is no built in method to send messages to the worker threads and you will need to use the available synchronization classes to implement the solution which suits your needs yourself.

    So you do not need a Queue to send messages from the worker thread to the main/gui thread: just use AddPendingEvent in the worker and a normal event handler in the gui. However if you need to send messages from the main/gui thread to the worker thread the right way to do it is probably a Queue, with the gui thread calling put() and the worker regularly checking for messages with get_nowait(). You can *probably* get away with using a simple boolean flag like some examples on this page use without any extra locking thanks to the GIL (Global Interpreter Lock), but for anything more complicated than that you need locking (like the Event used in the previous example), and for more than one kind of "message" a queue is usually the simplest way to go. Even something as simple as a worker thread that can be paused, resumed and cancelled is IMHO simpler using a queue than using a bunch of Event and/or Condition objects (the worker can just .get() on the queue to pause).

     


     

    See also Brian Kelley's demo (Process.py) which shows how to do a long task in a separate process (using wxProcess and wxExecute) rather than a thread. A separate process is like running a separate application - it can run full speed, whereas a thread might be slowed about 30% by Python's global interpreter lock (I don't know which version you were using, but at least for the first few threads, Python does pretty well, resulting in fairly minimal overhead - Josiah).

    MVoncken suggests: I prefer using a generator,(variant on wxYield,Example2):

    切换行号显示
            def DoSomeLongTask(self):
                """
                usualy defined somewhere else
                """
                for i in range(10):
                     time.sleep(1)
                     yield i
    
            def OnStart(self, event):
                # Over-Simplified version.
                # iterate over DoSomeLongTask,call wxYield and display status
                for status in DoSomeLongTask():
                    #display status here.. (wx.ProgressBar?)  
                    wxYield()
                    if self.need_abort:
                            break
    

     


     

    While using a generator and wx.Yield() works, you are limited to having a single long-running-task at any time. This may be a serious limitation to your application. Instead, you can use the built-in wx.FutureCall() function to schedule as many calls as you want/need. I use this in to handle 'replace all' in one of my applications.

    Josiah suggests:

     

    切换行号显示
        def DoSomeLongTask(self, state):
            #pull the state out
            x,y,z = state
            if not self.need_abort:
                time.sleep(1)
                x -= 1
                if x > 0 and not self.need_abort:
                    wx.FutureCall(1, self.DoSomeLongTask, (x,y,z))
    
        def OnStart(self, event):
            # Over-Simplified version.
            self._DoSomeLongTask((10, 1, 2))
    

    Alternatively, since wx.FutureCall creates and manipulates wx.Timer instances, we could just create a registry of stuff to call when a timer expires, and allow insertion of items to call into this registry. Why do we use a Timer/FutureCall instead of wx.CallAfter? It's mostly a matter of taste, but we use self.IsRunning() rather than keeping explicit state as a convenience.

    Josiah also suggests:

     

    切换行号显示
    import traceback
    
    class CallRegistry(wx.Timer):
        def __init__(self, delay=1):
            wx.Timer.__init__(self)
            self.tasks = []
            if delay < 1:
                delay = 1
            self.delay = delay
    
        def Notify(self):
            tl = []
            otl = self.tasks
            self.tasks = []
            for x in otl:
                try:
                    i,j = x
                    x = i(j)
                    if x:
                        try:
                            i,j = x
                        except:
                            if callable(x):
                                tl.append((x, None))
                        else:
                            tl.append(x)
                except (KeyboardInterrupt, SystemExit):
                    raise
                except:
                    traceback.print_exc()
            self.tasks.extend(tl)
            if not self.tasks:
                self.Stop()
    
        def CallLater(self, fcn, state=None):
            self.tasks.append((fcn, state))
            if not self.IsRunning():
                self.Start(self.delay, wx.TIMER_CONTINUOUS)
    

    Sample use of the above:

     

    切换行号显示
    class MyFrame(wx.Frame):
        def __init__(self, ...):
            ...
            self.cr = CallRegistry()
            self.cr.CallLater(self.Task1)
            self.cr.CallLater(self.Task2)
            ...
    
        def Task1(self, state):
            ...
            #automatically reschedules itself
            return self.Task1, newstate
    
        def Task2(self, state):
            ...
            if c1:
                #schedules some other task without arguments
                return self.Task3
            else:
                #reschedules itself again
                return self.Task2, newstate
    

     

    Redirecting text from stdout to a wx.TextCtrl

    I thought I might add this to the discussion. There are probably more ways than one to do this but... This is some sample code that I put together that updates a text control in wxpython from output being sent to stdout through backend "print" statments being executed in a separate thread.

    The idea here is to that you're creating a class with a "write" method to rebind to stdout (the real value of stdout is always kept in sys.stdout for rebinding). Everytime a message is spit out to stdout an event is thrown out to the text control for processing. The problem is that the text control doesn't update itself. Therefore we need to get the control to process its pending events. Therefore we need to set up a timer that gets the text control to attempt to process its pending events every so often.

    The threading code is pulled from Python in a Nutshell pretty much (which is what I'm using). It uses 2 Queues to process events. requestID, callable, args, kwds = self.requestQ.get() will suspend the thread untill the callable returns, and when it completes self.resultQ.put((requestID, callable(*args, **kwds))) will tell us that the process has finished its task. When this is done we submit an event to the GUI telling us that we are done.

    There's better ways to poll the resultQ but for my purposes here this works pretty nicely. Anyway hope this is useful to someone...

     

    切换行号显示
    #!/usr/bin/env python
    
    import wx, threading, Queue, sys, time
    from wx.lib.newevent import NewEvent
    
    ID_BEGIN=100
    wxStdOut, EVT_STDDOUT= NewEvent()
    wxWorkerDone, EVT_WORKER_DONE= NewEvent()
    
    def LongRunningProcess(lines_of_output):
        for x in range(lines_of_output):
            print "I am a line of output (hi!)...."
            time.sleep(1)
    
    class MainFrame(wx.Frame):
        def __init__(self, parent, id, title):
            wx.Frame.__init__(self, parent, id, title, size=(300, 300))
            self.requestQ = Queue.Queue() #create queues
            self.resultQ = Queue.Queue()
    
            #widgets
            p = wx.Panel(self)
            self.output_window = wx.TextCtrl(p, -1,
                                 style=wx.TE_AUTO_SCROLL|wx.TE_MULTILINE|wx.TE_READONLY)
            self.go = wx.Button(p, ID_BEGIN, 'Begin')
            self.output_window_timer = wx.Timer(self.output_window, -1)
    
            #frame sizers
            sizer = wx.BoxSizer(wx.VERTICAL)
            sizer.Add(self.output_window, 10, wx.EXPAND)
            sizer.Add(self.go, 1, wx.EXPAND)
            p.SetSizer(sizer)
    
            #events
            wx.EVT_BUTTON(self, ID_BEGIN, self.OnBeginTest)
            self.output_window.Bind(EVT_STDDOUT, self.OnUpdateOutputWindow)
            self.output_window.Bind(wx.EVT_TIMER, self.OnProcessPendingOutputWindowEvents)
            self.Bind(EVT_WORKER_DONE, self.OnWorkerDone)
    
            #thread
            self.worker = Worker(self, self.requestQ, self.resultQ)
    
        def OnUpdateOutputWindow(self, event):
            value = event.text
            self.output_window.AppendText(value)
    
        def OnBeginTest(self, event):
            lines_of_output=7
            self.go.Disable()
            self.worker.beginTest(LongRunningProcess, lines_of_output)
            self.output_window_timer.Start(50)
    
        def OnWorkerDone(self, event):
            self.output_window_timer.Stop()
            self.go.Enable()
    
        def OnProcessPendingOutputWindowEvents(self, event):
            self.output_window.ProcessPendingEvents()
    
    class Worker(threading.Thread):
        requestID = 0
        def __init__(self, parent, requestQ, resultQ, **kwds):
            threading.Thread.__init__(self, **kwds)
            self.setDaemon(True)
            self.requestQ = requestQ
            self.resultQ = resultQ
            self.start()
    
        def beginTest(self, callable, *args, **kwds):
            Worker.requestID +=1
            self.requestQ.put((Worker.requestID, callable, args, kwds))
            return Worker.requestID
    
        def run(self):
            while True:
                requestID, callable, args, kwds = self.requestQ.get()
                self.resultQ.put((requestID, callable(*args, **kwds)))
                evt = wxWorkerDone()
                wx.PostEvent(wx.GetApp().frame, evt)
    
    class SysOutListener:
        def write(self, string):
            sys.__stdout__.write(string)
            evt = wxStdOut(text=string)
            wx.PostEvent(wx.GetApp().frame.output_window, evt)
    
    class MyApp(wx.App):
        def OnInit(self):
            self.frame = MainFrame(None, -1, 'rebinding stdout')
            self.frame.Show(True)
            self.frame.Center()
            return True
    
    #entry point
    if __name__ == '__main__':
        app = MyApp(0)
        sys.stdout = SysOutListener()
        app.MainLoop()
    

     

    Easiest Implementation *Ever* :)

    So you say you want to perform a long running task from a wxPython GUI? Well, it don't get no simpler than this! Use threads but don't mess with a separate thread class. Make sure if you're running in the other thread that you interact with the GUI via wx.CallAfter.

     

    切换行号显示
    import wx
    import thread
    from time import sleep
    
    class MainFrame(wx.Frame):
    
        def __init__(self, parent):
            wx.Frame.__init__(self, parent)
    
            self.label = wx.StaticText(self, label="Ready")
            self.btn = wx.Button(self, label="Start")
            self.gauge = wx.Gauge(self)
    
            sizer = wx.BoxSizer(wx.VERTICAL)
            sizer.Add(self.label, proportion=1, flag=wx.EXPAND)
            sizer.Add(self.btn, proportion=0, flag=wx.EXPAND)
            sizer.Add(self.gauge, proportion=0, flag=wx.EXPAND)
    
            self.SetSizerAndFit(sizer)
    
            self.Bind(wx.EVT_BUTTON, self.onButton)
    
        def onButton(self, evt):
            self.btn.Enable(False)
            self.gauge.SetValue(0)
            self.label.SetLabel("Running")
            thread.start_new_thread(self.longRunning, ())
    
        def onLongRunDone(self):
            self.gauge.SetValue(100)
            self.label.SetLabel("Done")
            self.btn.Enable(True)
    
        def longRunning(self):
            """This runs in a different thread.  Sleep is used to simulate a long running task."""
            sleep(3)
            wx.CallAfter(self.gauge.SetValue, 20)
            sleep(5)
            wx.CallAfter(self.gauge.SetValue, 50)
            sleep(1)
            wx.CallAfter(self.gauge.SetValue, 70)
            sleep(10)
            wx.CallAfter(self.onLongRunDone)
    
    if __name__ == "__main__":
        app = wx.PySimpleApp()
        app.TopWindow = MainFrame(None)
        app.TopWindow.Show()
        app.MainLoop()
    

     

    THE Easiest Implementation *Ever* :-)))

    ...no, really. All (most) of the above cases are one-way communication examples. Sometimes, you just want the main thread to be non-gui and communicate with the gui in a blocking manner. See http://radekpodgorny.blogspot.cz/2012/12/working-with-wxpython-in-separate-thread.html on how to achieve that.

    /! update: robin was so kind to comment on my solution. i'll investigate all his ideas and probably refactor/improve the code. stay tuned and THANK YOU, robin! ;-)

  • 相关阅读:
    haproxy 2.5 发布
    cube.js sql 支持简单说明
    基于graalvm 开发一个cube.js jdbc driver 的思路
    apache kyuubi Frontend 支持mysql 协议
    oceanbase 资源池删除说明
    基于obd 的oceanbase 扩容说明
    jfilter一个方便的spring rest 响应过滤扩展
    cube.js schema 定义多datasource 说明
    typescript 编写自定义定义文件
    meow 辅助开发cli 应用的工具
  • 原文地址:https://www.cnblogs.com/dengyigod/p/3304586.html
Copyright © 2011-2022 走看看