
This is how to stop some Fortran code though Python, specifically Fortran with OpenMP through Python.
This is similar to Monitoring Fortran through Python, and uses a similar set up. So I recommend understanding that one first as it may have more details; but don’t let these words stop you from continuing! Let’s begin.
So say you’ve compiled some Fortran code with f2py to be able to run it through Python, and you kind of want to interrupt/stop its progress a bit cleaner than killing the entire thing through the task manager. So first in your Fortran code, define a logical value called external stop, or ext_stop for short:
module fblock
use iso_fortran_env
logical, save :: ext_stop = .false.
contains
subroutine long_runner(...)
!f2py threadsafe
We also add the threadsafe flag for Python to release its GIL allowing a separate thread to run, which lets us to actually send the stop signal into the compiled code.
Then, depending on what exactly you’re doing, stick an if statement checking the ext_stop flag:
module fblock
use iso_fortran_env
logical, save :: ext_stop = .false.
contains
subroutine long_runner(...)
!f2py threadsafe
! code...
do i = 1, giant_number
! code...
if (ext_stop) exit
If you’re using OpenMP, then this gets slightly more involved, but still doable. The cancellation flag is used for this. I personally only have the first thread do the checking:
subroutine long_runner(...)
use omp
! code...
!$OMP PARALLEL
proc_num = omp_get_thread_num()
! ...
!$OMP DO
do i = 1, giant_number
! ...
if ((proc_num == 0) .and. (ext_stop)) then
ext_stop = .true.
!$OMP CANCEL DO
end if
! ...
!$OMP CANCELLATION POINT DO
end do
!$OMP END DO
!$OMP END PARALLEL
! ...
The multiprocessing is set to stop or break out of the loop with the !$OMP CANCEL flag, and where a thread checks to see if it breaks or stops is defined by the !$OMP CANCELLATION POINT flag. So the former flag flips a switch from false to true and the latter flag is where each thread checks this switch. This cancellation method is considered an expensive operation to do, so the environmental variable OMP_CANCELLATION has to be set to true – more on this below.
Now that’s it for the Fortran part, so compile it with f2py to make a python module. Say, for instance, your Fortran module ‘fblock’ with subroutine ‘long_runner’ has been compiled into the file ‘pyblk.pyd’. import it with a few other modules:
from multiprocessing import shared_memory
import multiprocessing as mp
import numpy as np
import threading
import sys, os
import time
import pyblk # your f2py compiled fortran block
# needed to be able to cancel the fortran module mid run
os.environ['OMP_CANCELLATION'] = 'true'
global bg_runn
bg_runn = True
def bg_thread(stop_que):
"background thread for compiled fortran block"
global bg_runn
# wait a bit for fortran code to initialize the queried variables
time.sleep(0.25)
stop = False # manual stop flag
wait = 0.25 # amount of seconds to wait before sending progress again
while bg_runn:
if stop_que.empty(): time.sleep(wait)
else: stop = stop_que.get()
if stop: # told to stop, set fortran flag to stop
pyblk.fblock.ext_stop = True
break
# if this is set to true somewhere within the fortran code
# you can also detect it here
if pyblk.fblock.ext_stop: break # fortran itself has stopped
return
# this fortran block is run on the secondary process
def run_fortran(stop_que, init_data):
"call to run compiled fortran block"
global bg_runn
# setup/start background query thread
thrd = threading.Thread(target=bg_thread, args=(stop_que, ))
thrd.start()
# call the compiled fortran code
results = pyblk.fblock.long_runner(init_data)
bg_runn = False # inform query to stop
thrd.join() # wait for it to stop (wait out the sleep cycle)
# now, do something with the results or
# copy the results out from this process
##shm = shared_memory.SharedMemory('results') # connect to shared mem
##b = np.ndarray(results.shape, dtype=results.dtype, buffer=shm.buf)
##b[:] = img_arr[:] # copy results (memory is now allocated)
##shm.close() # disconnect from shared mem
return
def main():
"set up and run the fortran code"
t0 = time.time()
print('start:', time.ctime(t0))
stop_que = mp.Queue() # manual stop queue
manual_stop = False
# if wanting the results on the primary process:
# create shared memory to later copy result array into
# (array size is needed; no memory is used/allocated at this time)
##shm = shared_memory.SharedMemory('results', create=True,
## size=np.int32(1).nbytes * amount)
init_data = None # your initial information, if any
# if it's large and on disk, read it in on the secondary process
try:
# setup/start the secondary process with the compiled fortran code
run = mp.Process(target=run_fortran, args=(stop_que,
init_data))
run.start()
# wait for the secondary process to complete
# while being somewhat active here to capture a KeyboardInterrupt
while run.is_alive(): time.sleep(0.25)
except KeyboardInterrupt:
print('manually stopped with ctrl+c')
stop_que.put(True)
manual_stop = True
except BaseException as err:
print('Error:', end=' ')
print(type(err).__name__)
stop_que.put(True)
if not manual_stop: print('stopped on its own')
# extract the results from secondary process with SharedMemory
# (shape and dtype need to be known)
##results = np.ndarray(shape, dtype=np.int32, buffer=shm.buf)
t1 = time.time()
print('end :', time.ctime(t1))
print('{:.3f} seconds'.format(t1 - t0))
return
if __name__ == '__main__':
main()
The OMP_CANCELLATION variable is easily set with os.environ['OMP_CANCELLATION'] = 'true' (Note that it’s a lowercase string). If your fortran code can determine when it needs to cancel on its own, that’s all you need to set before calling it.
The above code here however is all on the assumption that stopping the Fortran block is a user’s manual interruption of the code by cancelling it with ctrl+C, which is why it’s set to run the Fortran block in an entirely separate process using the multiprocessing module (Fortran needs to run in the foreground and interacting with it also needs to be in the foreground; there can’t be two foregrounds for a single process).
But from there, if you really want to, you can merge the stopping with the monitoring into a single background function like so:
def bg_thread(prog_que, stop_que):
"background thread for compiled fortran block"
global bg_runn
# wait a bit for fortran code to initialize the queried variables
time.sleep(0.25)
stop = False # manual stop flag
wait = 0.25 # amount of seconds to wait before sending progress again
cur_prog = pyblk.fblock.progress
max_prog = pyblk.fblock.max_prog
while bg_runn:
if cur_prog >= max_prog: break
# for monitoring progress on primary process
prog_que.put((cur_prog, max_prog))
# monitoring progress on current process
#pct = cur_prog / max_prog
#print(cur_prog, 'of', max_prog, '({:.3%})'.format(pct))
# listen (and wait a bit) to find out if system should stop
if stop_que.empty(): time.sleep(wait)
else: stop = stop_que.get()
if stop: # told to stop, set fortran flag to stop
pyblk.fblock.ext_stop = True
break
if pyblk.fblock.ext_stop: break # fortran itself has stopped
prog_que.put((cur_prog, max_prog)) # send final progress
prog_que.put('DONE') # inform other end that the monitoring is done
return
And with an example interface, it’ll all look like so:
from multiprocessing import shared_memory
import multiprocessing as mp
import numpy as np
import threading
import sys, os
import time
import pyblk # your f2py compiled fortran block
# needed to be able to cancel the fortran module mid run
os.environ['OMP_CANCELLATION'] = 'true'
# to use PyQt5, replace 'PySide2' with 'PyQt5'
from PySide2.QtWidgets import (QApplication, QWidget, QVBoxLayout,
QProgressBar, QPushButton)
# this query is run in the background on the secondary process
def bg_thread(prog_que, stop_que):
"background thread for compiled fortran block"
global bg_runn
# wait a bit for fortran code to initialize the queried variables
time.sleep(0.25)
stop = False # manual stop flag
wait = 0.25 # amount of seconds to wait before sending progress again
cur_prog = pyblk.fblock.progress
max_prog = pyblk.fblock.max_prog
while bg_runn:
if cur_prog >= max_prog: break
# for monitoring progress on primary process
prog_que.put((cur_prog, max_prog))
# monitoring progress on current process
#pct = cur_prog / max_prog
#print(cur_prog, 'of', max_prog, '({:.3%})'.format(pct))
# listen (and wait a bit) to find out if system should stop
if stop_que.empty(): time.sleep(wait)
else: stop = stop_que.get()
if stop: # told to stop, set fortran flag to stop
pyblk.fblock.ext_stop = True
break
if pyblk.fblock.ext_stop: break # fortran itself has stopped
prog_que.put((cur_prog, max_prog)) # send final progress
prog_que.put('DONE') # inform other end that the monitoring is done
return
# this fortran block is run on the secondary process
def run_fortran(prog_que, stop_que, init_data):
"call to run compiled fortran block"
global bg_runn
# setup/start background query thread
thrd = threading.Thread(target=bg_thread, args=(prog_que, stop_que, ))
thrd.start()
# call the compiled fortran code
results = pyblk.fblock.long_runner(init_data)
bg_runn = False # inform query to stop
thrd.join() # wait for it to stop (wait out the sleep cycle)
# now, do something with the results or
# copy the results out from this process
##shm = shared_memory.SharedMemory('results') # connect to shared mem
##b = np.ndarray(results.shape, dtype=results.dtype, buffer=shm.buf)
##b[:] = img_arr[:] # copy results (memory is now allocated)
##shm.close() # disconnect from shared mem
return
# this GUI is run on the primary process
class StopTest(QWidget):
"progess test of compiled fortran code through python"
def __init__(self, parent=None):
super().__init__()
# setup/layout of widget
self.pbar = QProgressBar()
self.pbar.setTextVisible(False)
self.start_button = QPushButton('Start')
self.start_button.clicked.connect(self.run_the_thing)
self.stop_button = QPushButton('Stop')
self.stop_button.clicked.connect(self.stop_the_thing)
self.stop_button.setEnabled(False)
self.stop_que = None
self.stopped_early = False
ly = QVBoxLayout()
ly.addWidget(self.start_button)
ly.addWidget(self.stop_button)
ly.addWidget(self.pbar)
self.setLayout(ly)
def stop_the_thing(self):
"called on clicking the stop button"
if self.stop_que is None: return
app.processEvents()
self.stopped_early = True
self.stop_que.put(True)
app.processEvents()
def run_the_thing(self):
"called on clicking the start button"
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
app.processEvents()
t0 = time.time()
print('start:', time.ctime(t0))
prog_que = mp.Queue() # progress queue
stop_que = mp.Queue() # manual stop queue
self.stop_que = stop_que
# if wanting the results on the primary process:
# create shared memory to later copy result array into
# (array size is needed; no memory is used/allocated at this time)
##shm = shared_memory.SharedMemory('results', create=True,
## size=np.int32(1).nbytes * amount)
init_data = None # your initial information, if any
# if it's large and on disk, read it in on the secondary process
# setup/start the secondary process with the compiled fortran code
run = mp.Process(target=run_fortran, args=(prog_que, stop_que,
init_data))
run.start()
# listen in on the query through the Queue
while True:
res = prog_que.get()
if res == 'DONE': break
current, total = res # unpack from queue
if total != self.pbar.maximum(): self.pbar.setMaximum(total)
self.pbar.setValue(current)
self.setWindowTitle('{:.3%}'.format(current / total))
app.processEvents()
# this while loop can be done on a separate background thread
# but isn't done for this example
run.join() # wait for the secondary process to complete
self.stop_que = None
# extract the results from secondary process with SharedMemory
# (shape and dtype need to be known)
##results = np.ndarray(shape, dtype=np.int32, buffer=shm.buf)
if self.stopped_early:
print('stopped with stop button')
self.stopped_early = False # reset
else:
print('finished on its own')
t1 = time.time()
print('end :', time.ctime(t1))
print('{:.3f} seconds'.format(t1 - t0))
self.pbar.setValue(total)
self.setWindowTitle('Done!')
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
return
if __name__ == '__main__':
app = QApplication(sys.argv)
window = StopTest()
window.show()
sys.exit(app.exec_())
Thus letting you monitor its progress and stop it at any time (before it finishes on its own that is). If you want to restart your progress from where you left off, you’ll have to have the fortran code give up its internal position to you when you stop it and that’s entirely dependent on what you’re doing; I’ll thoughtfully leave such an exercise as an aggravation for the reader.
Leave a comment