Stopping Fortran through Python

Python icon next to a stop sign next to the Fortran icon

This is how to stop some Fortran code though Python, specifically Fortran with OpenMP through Python.

This is similar to Monitoring Fortran through Python, and uses a similar set up. So I recommend understanding that one first as it may have more details; but don’t let these words stop you from continuing! Let’s begin.

So say you’ve compiled some Fortran code with f2py to be able to run it through Python, and you kind of want to interrupt/stop its progress a bit cleaner than killing the entire thing through the task manager. So first in your Fortran code, define a logical value called external stop, or ext_stop for short:

module fblock
  use iso_fortran_env
  logical, save :: ext_stop = .false.

  contains
    subroutine long_runner(...)
      !f2py threadsafe

We also add the threadsafe flag for Python to release its GIL allowing a separate thread to run, which lets us to actually send the stop signal into the compiled code.

Then, depending on what exactly you’re doing, stick an if statement checking the ext_stop flag:

module fblock
  use iso_fortran_env
  logical, save :: ext_stop = .false.

  contains
    subroutine long_runner(...)
      !f2py threadsafe
      ! code...

       do i = 1, giant_number
        ! code...
            
        if (ext_stop) exit

If you’re using OpenMP, then this gets slightly more involved, but still doable. The cancellation flag is used for this. I personally only have the first thread do the checking:

subroutine long_runner(...)
  use omp

  ! code...

  !$OMP PARALLEL
  proc_num = omp_get_thread_num()
  ! ...

  !$OMP DO
  do i = 1, giant_number
    ! ...
    if ((proc_num == 0) .and. (ext_stop)) then
      ext_stop = .true.
      !$OMP CANCEL DO
    end if
    ! ...
        
    !$OMP CANCELLATION POINT DO
  end do
  !$OMP END DO
  !$OMP END PARALLEL
  ! ...

The multiprocessing is set to stop or break out of the loop with the !$OMP CANCEL flag, and where a thread checks to see if it breaks or stops is defined by the !$OMP CANCELLATION POINT flag.  So the former flag flips a switch from false to true and the latter flag is where each thread checks this switch. This cancellation method is considered an expensive operation to do, so the environmental variable OMP_CANCELLATION has to be set to true – more on this below.


Now that’s it for the Fortran part, so compile it with f2py to make a python module. Say, for instance, your Fortran module ‘fblock’ with subroutine ‘long_runner’ has been compiled into the file ‘pyblk.pyd’. import it with a few other modules:

from multiprocessing import shared_memory
import multiprocessing as mp
import numpy as np
import threading
import sys, os
import time

import pyblk  # your f2py compiled fortran block

# needed to be able to cancel the fortran module mid run
os.environ['OMP_CANCELLATION'] = 'true'

global bg_runn
bg_runn = True

def bg_thread(stop_que):
    "background thread for compiled fortran block"
    global bg_runn

    # wait a bit for fortran code to initialize the queried variables
    time.sleep(0.25)

    stop = False  # manual stop flag
    wait = 0.25  # amount of seconds to wait before sending progress again

    while bg_runn:
        if stop_que.empty(): time.sleep(wait)
        else: stop = stop_que.get()

        if stop:  # told to stop, set fortran flag to stop
            pyblk.fblock.ext_stop = True
            break

        # if this is set to true somewhere within the fortran code
        # you can also detect it here
        if pyblk.fblock.ext_stop: break  # fortran itself has stopped

    return

# this fortran block is run on the secondary process
def run_fortran(stop_que, init_data):
    "call to run compiled fortran block"
    global bg_runn

    # setup/start background query thread
    thrd = threading.Thread(target=bg_thread, args=(stop_que, ))
    thrd.start()

    # call the compiled fortran code
    results = pyblk.fblock.long_runner(init_data)

    bg_runn = False  # inform query to stop
    thrd.join()  # wait for it to stop (wait out the sleep cycle)

    # now, do something with the results or
    # copy the results out from this process
    ##shm = shared_memory.SharedMemory('results')  # connect to shared mem
    ##b = np.ndarray(results.shape, dtype=results.dtype, buffer=shm.buf)
    ##b[:] = img_arr[:]  # copy results (memory is now allocated)
    ##shm.close()  # disconnect from shared mem
    return

def main():
    "set up and run the fortran code"
    t0 = time.time()
    print('start:', time.ctime(t0))

    stop_que = mp.Queue()  # manual stop queue
    manual_stop = False

    # if wanting the results on the primary process:
    # create shared memory to later copy result array into
    # (array size is needed; no memory is used/allocated at this time)
    ##shm = shared_memory.SharedMemory('results', create=True,
    ##                           size=np.int32(1).nbytes * amount)

    init_data = None  # your initial information, if any
    # if it's large and on disk, read it in on the secondary process

    try:

        # setup/start the secondary process with the compiled fortran code
        run = mp.Process(target=run_fortran, args=(stop_que,
                                                   init_data))
        run.start()

        # wait for the secondary process to complete
        # while being somewhat active here to capture a KeyboardInterrupt
        while run.is_alive(): time.sleep(0.25)

    except KeyboardInterrupt:
        print('manually stopped with ctrl+c')
        stop_que.put(True)
        manual_stop = True
    except BaseException as err:
        print('Error:', end=' ')
        print(type(err).__name__)
        stop_que.put(True)
    if not manual_stop: print('stopped on its own')

    # extract the results from secondary process with SharedMemory
    # (shape and dtype need to be known)
    ##results = np.ndarray(shape, dtype=np.int32, buffer=shm.buf)

    t1 = time.time()
    print('end  :', time.ctime(t1))
    print('{:.3f} seconds'.format(t1 - t0))
    return

if __name__ == '__main__':
    main()

The OMP_CANCELLATION variable is easily set with os.environ['OMP_CANCELLATION'] = 'true' (Note that it’s a lowercase string). If your fortran code can determine when it needs to cancel on its own, that’s all you need to set before calling it.

The above code here however is all on the assumption that stopping the Fortran block is a user’s manual interruption of the code by cancelling it with ctrl+C, which is why it’s set to run the Fortran block in an entirely separate process using the multiprocessing module (Fortran needs to run in the foreground and interacting with it also needs to be in the foreground; there can’t be two foregrounds for a single process).

But from there, if you really want to, you can merge the stopping with the monitoring into a single background function like so:

def bg_thread(prog_que, stop_que):
    "background thread for compiled fortran block"
    global bg_runn

    # wait a bit for fortran code to initialize the queried variables
    time.sleep(0.25)

    stop = False  # manual stop flag
    wait = 0.25  # amount of seconds to wait before sending progress again

    cur_prog = pyblk.fblock.progress
    max_prog = pyblk.fblock.max_prog

    while bg_runn:
        if cur_prog >= max_prog: break

        # for monitoring progress on primary process
        prog_que.put((cur_prog, max_prog))

        # monitoring progress on current process
        #pct = cur_prog / max_prog
        #print(cur_prog, 'of', max_prog, '({:.3%})'.format(pct))

        # listen (and wait a bit) to find out if system should stop
        if stop_que.empty(): time.sleep(wait)
        else: stop = stop_que.get()

        if stop:  # told to stop, set fortran flag to stop
            pyblk.fblock.ext_stop = True
            break

        if pyblk.fblock.ext_stop: break  # fortran itself has stopped

    prog_que.put((cur_prog, max_prog))  # send final progress
    prog_que.put('DONE')  # inform other end that the monitoring is done
    return

And with an example interface, it’ll all look like so:

from multiprocessing import shared_memory
import multiprocessing as mp
import numpy as np
import threading
import sys, os
import time

import pyblk  # your f2py compiled fortran block

# needed to be able to cancel the fortran module mid run
os.environ['OMP_CANCELLATION'] = 'true'

# to use PyQt5, replace 'PySide2' with 'PyQt5'
from PySide2.QtWidgets import (QApplication, QWidget, QVBoxLayout,
                               QProgressBar, QPushButton)


# this query is run in the background on the secondary process
def bg_thread(prog_que, stop_que):
    "background thread for compiled fortran block"
    global bg_runn

    # wait a bit for fortran code to initialize the queried variables
    time.sleep(0.25)

    stop = False  # manual stop flag
    wait = 0.25  # amount of seconds to wait before sending progress again

    cur_prog = pyblk.fblock.progress
    max_prog = pyblk.fblock.max_prog

    while bg_runn:
        if cur_prog >= max_prog: break

        # for monitoring progress on primary process
        prog_que.put((cur_prog, max_prog))

        # monitoring progress on current process
        #pct = cur_prog / max_prog
        #print(cur_prog, 'of', max_prog, '({:.3%})'.format(pct))

        # listen (and wait a bit) to find out if system should stop
        if stop_que.empty(): time.sleep(wait)
        else: stop = stop_que.get()

        if stop:  # told to stop, set fortran flag to stop
            pyblk.fblock.ext_stop = True
            break

        if pyblk.fblock.ext_stop: break  # fortran itself has stopped

    prog_que.put((cur_prog, max_prog))  # send final progress
    prog_que.put('DONE')  # inform other end that the monitoring is done
    return

# this fortran block is run on the secondary process
def run_fortran(prog_que, stop_que, init_data):
    "call to run compiled fortran block"
    global bg_runn

    # setup/start background query thread
    thrd = threading.Thread(target=bg_thread, args=(prog_que, stop_que, ))
    thrd.start()

    # call the compiled fortran code
    results = pyblk.fblock.long_runner(init_data)

    bg_runn = False  # inform query to stop
    thrd.join()  # wait for it to stop (wait out the sleep cycle)

    # now, do something with the results or
    # copy the results out from this process
    ##shm = shared_memory.SharedMemory('results')  # connect to shared mem
    ##b = np.ndarray(results.shape, dtype=results.dtype, buffer=shm.buf)
    ##b[:] = img_arr[:]  # copy results (memory is now allocated)
    ##shm.close()  # disconnect from shared mem
    return


# this GUI is run on the primary process
class StopTest(QWidget):
    "progess test of compiled fortran code through python"
    def __init__(self, parent=None):
        super().__init__()
        # setup/layout of widget
        self.pbar = QProgressBar()
        self.pbar.setTextVisible(False)

        self.start_button = QPushButton('Start')
        self.start_button.clicked.connect(self.run_the_thing)

        self.stop_button = QPushButton('Stop')
        self.stop_button.clicked.connect(self.stop_the_thing)
        self.stop_button.setEnabled(False)
        self.stop_que = None
        self.stopped_early = False

        ly = QVBoxLayout()
        ly.addWidget(self.start_button)
        ly.addWidget(self.stop_button)
        ly.addWidget(self.pbar)
        self.setLayout(ly)

    def stop_the_thing(self):
        "called on clicking the stop button"
        if self.stop_que is None: return
        app.processEvents()
        self.stopped_early = True
        self.stop_que.put(True)
        app.processEvents()

    def run_the_thing(self):
        "called on clicking the start button"
        self.start_button.setEnabled(False)
        self.stop_button.setEnabled(True)
        app.processEvents()

        t0 = time.time()
        print('start:', time.ctime(t0))

        prog_que = mp.Queue()  # progress queue
        stop_que = mp.Queue()  # manual stop queue
        self.stop_que = stop_que

        # if wanting the results on the primary process:
        # create shared memory to later copy result array into
        # (array size is needed; no memory is used/allocated at this time)
        ##shm = shared_memory.SharedMemory('results', create=True,
        ##                                 size=np.int32(1).nbytes * amount)

        init_data = None  # your initial information, if any
        # if it's large and on disk, read it in on the secondary process

        # setup/start the secondary process with the compiled fortran code
        run = mp.Process(target=run_fortran, args=(prog_que, stop_que,
                                                   init_data))
        run.start()

        # listen in on the query through the Queue
        while True:
            res = prog_que.get()
            if res == 'DONE': break
            current, total = res  # unpack from queue

            if total != self.pbar.maximum(): self.pbar.setMaximum(total)

            self.pbar.setValue(current)
            self.setWindowTitle('{:.3%}'.format(current / total))
            app.processEvents()
        # this while loop can be done on a separate background thread
        # but isn't done for this example

        run.join()  # wait for the secondary process to complete
        self.stop_que = None

        # extract the results from secondary process with SharedMemory
        # (shape and dtype need to be known)
        ##results = np.ndarray(shape, dtype=np.int32, buffer=shm.buf)

        if self.stopped_early:
            print('stopped with stop button')
            self.stopped_early = False  # reset
        else:
            print('finished on its own')


        t1 = time.time()
        print('end  :', time.ctime(t1))
        print('{:.3f} seconds'.format(t1 - t0))

        self.pbar.setValue(total)
        self.setWindowTitle('Done!')
        self.start_button.setEnabled(True)
        self.stop_button.setEnabled(False)
        return

if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = StopTest()
    window.show()
    sys.exit(app.exec_())

Thus letting you monitor its progress and stop it at any time (before it finishes on its own that is). If you want to restart your progress from where you left off, you’ll have to have the fortran code give up its internal position to you when you stop it and that’s entirely dependent on what you’re doing; I’ll thoughtfully leave such an exercise as an aggravation for the reader.

Tagged with: , , ,
Posted in Python

Leave a comment

In Archive
Design a site like this with WordPress.com
Get started