| #!/usr/bin/perl |
| ### Simulate an array job -- work horse script |
| ### $Id: arrayrun_worker,v 1.30 2011/04/27 08:58:25 root Exp $ |
| |
| ### Copyright 2009,2010,2011 Bjørn-Helge Mevik <b.h.mevik@usit.uio.no> |
| ### |
| ### This program is free software; you can redistribute it and/or modify |
| ### it under the terms of the GNU General Public License version 2 as |
| ### published by the Free Software Foundation. |
| ### |
| ### This program is distributed in the hope that it will be useful, |
| ### but WITHOUT ANY WARRANTY; without even the implied warranty of |
| ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| ### GNU General Public License version 2 for more details. |
| ### |
| ### A copy of the GPL v. 2 text is available here: |
| ### http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt |
| |
| |
| ### Note: This script is meant to be run by 'arrayrun'; do not |
| ### run this script directly. |
| |
| use strict; |
| use List::Util qw/min/; |
| use Time::HiRes qw/sleep/; |
| |
| ## Debug: |
| use warnings; |
| use constant DEBUG => 1; |
| $| = 1 if DEBUG; |
| |
| ## Configuration: |
| my $maxJobs = 100; # Max total number of jobs in queue |
| my $maxIdleJobs = 10; # Max number of pending jobs in queue |
| my $maxBurst = 10; # Max number of jobs to submit at a time |
| my $pollSeconds = 180; # How many seconds to sleep between each poll |
| my $maxFails = 300; # Max errors to accept when submitting a job |
| my $retrySleep = 300; # Seconds to sleep between each retry |
| my $doubleCheckSleep = 30; # Seconds to sleep before double checking |
| my $maxRestarts = 10; # Max number of restarts all in all |
| my $sbatch = "/site/bin/sbatch";# Which sbatch command to use |
| |
| ## Parse command line |
| my $restart = 0; |
| if (@ARGV && $ARGV[0] eq '-r') { |
| $restart = 1; |
| shift @ARGV; |
| } |
| my $jobSpec = shift @ARGV or die "Too few arguments\n"; |
| my @commandLine = @ARGV or die "Too few arguments\n"; |
| my @jobArray; |
| foreach (split /,/, $jobSpec) { |
| if (/^(\d+)$/) { |
| push @jobArray, $1; |
| } elsif (/^(\d+)[-:](\d+)$/) { |
| push @jobArray, $1 .. $2; |
| } elsif (/^(\d+)[-:](\d+):(\d+)$/) { |
| for (my $i = $1; $i <= $2; $i += $3) { |
| push @jobArray, $i; |
| } |
| } else { |
| die "Unknown TASK_ID specification: '$_'\n"; |
| } |
| } |
| die "No TASK_IDs specified\n" unless (@jobArray); |
| |
| print "TASK_IDs to submit: ", join(",", @jobArray), " |
| Command line: @commandLine\n" if DEBUG; |
| print "Will restart failed jobs\n" if DEBUG && $restart; |
| |
| ## Setup |
| my $mainid = $ENV{'SLURM_JOB_ID'} || $ENV{'SLURM_JOBID'} || 'null'; |
| my $runids = []; # List of IDs of running jobs |
| my $pendids = []; # List of IDs of pending jobs |
| my $testids = []; # List of IDs to test |
| my %taskid; # TASK_ID for all submitted jobs |
| my @restartedTasks; # TASK_ID of all restarted jobs |
| my @tmp = (localtime())[5,4,3]; |
| my $starttime = sprintf "%d-%02d-%02d", $tmp[0] + 1900, $tmp[1] + 1, $tmp[2]; |
| |
| print "Main job id: $mainid\nStart time: $starttime\n" if DEBUG; |
| |
| ## Trap signals such that any running sub jobs are cancelled if the |
| ## main job is cancelled or times out. |
| sub clean_up { |
| print "Caught signal. Cleaning up...\n" if DEBUG; |
| ## Cancel any subjobs: |
| if (@{$runids} || @{$pendids} || @{$testids}) { |
| print "Cancelling @{$runids} @{$pendids} @{$testids}\n" if DEBUG; |
| system("echo scancel @{$runids} @{$pendids} @{$testids}"); |
| system("scancel @{$runids} @{$pendids} @{$testids}"); |
| print "Cancelled @{$runids} @{$pendids} @{$testids}\n" if DEBUG; |
| } |
| exit 0; |
| } |
| $SIG{'TERM'} = 'clean_up'; # scancel/timeout |
| $SIG{'INT'} = 'clean_up'; # ^C in interactive use |
| |
| |
| ## Submit a job with fail resilience: |
| sub submit_job { |
| my $jobName = shift; |
| (my $commandLine = shift) || die "Job script not specified\n"; |
| my $id; |
| my $nFails = 0; |
| my $success = 0; |
| until ($success) { |
| my $fail = 0; |
| $id = `$sbatch --job-name=$jobName $commandLine 2>&1`; |
| if ($? == 0) { |
| chomp($id); |
| print " Result from submit: $id" if DEBUG; |
| if ($id =~ s/.*Submitted batch job //) { |
| $success = 1; |
| } |
| } else { |
| warn " sbatch failed with error code '$?' (output: '", |
| $id || '', "'): $!\n"; |
| $nFails++; |
| } |
| until ($success || $fail || $nFails > $maxFails) { |
| ## Double check that the job did not start |
| warn " Problem with submitting/checking job. Checking with squeue in a while.\n"; |
| sleep $doubleCheckSleep - 5 + int(rand(11)); |
| $id = `squeue -h -o '%i %j' -u $ENV{USER}`; |
| if ($? == 0) { |
| chomp($id); |
| print " Result from squeue: $id" if DEBUG; |
| if ($id =~ s/ $jobName//) { |
| warn "Job '$jobName' seems to have been started as jobid '$id'. Using that id.\n"; |
| $success = 1; |
| } else { |
| warn "Job '$jobName' did not start.\n"; |
| $fail = 1; |
| } |
| } else { |
| $nFails++; |
| } |
| } |
| unless ($success) { |
| if ($nFails <= $maxFails) { |
| warn " Could not submit job. Trying again in a while.\n"; |
| sleep $retrySleep - 5 + int(rand(11)); |
| } else { |
| die " Cannot submit job. Giving up after $nFails errors.\n"; |
| } |
| } |
| } |
| print " => job ID $id\n" if DEBUG; |
| $id; |
| } |
| |
| |
| ## Check the given jobs, and return lists of the ones still running/waiting: |
| sub check_queue { |
| print scalar localtime, ": Checking queue...\n" if DEBUG; |
| my $queueids = `squeue -h -o '%i %t' 2>&1`; |
| if ($? != 0) { |
| print "squeue failed with error code '$?',\nmessage: $queueids\nI will assume all jobs are still running/waiting\n"; |
| return; |
| } |
| my $testids = [ @{$runids}, @{$pendids} ]; |
| print "Number of jobs to check: ", scalar @{$testids}, "\n" if DEBUG; |
| sleep 10 + rand; # Sleep to allow requeued jobs to get back |
| # in queue. |
| $runids = []; |
| $pendids = []; |
| foreach my $id (@{$testids}) { |
| if ($queueids =~ /$id (\w+)/) { |
| if ($1 eq "PD") { |
| print " Job $id is still waiting\n" if DEBUG; |
| push @{$pendids}, $id; |
| } else { |
| print " Job $id is still running\n" if DEBUG; |
| push @{$runids}, $id; |
| } |
| } else { |
| print " Job $id has finished:\n" if DEBUG; |
| my @sacctres = `sacct -o jobid,start,end,maxvmsize,maxrss,state,exitcode -S $starttime -j $id 2>&1`; |
| if ($? != 0) { |
| print " sacct failed with error code '$?',\n message: ", |
| @sacctres, " I will assume job $id finished successfully\n"; |
| } else { |
| print join(" ", @sacctres); |
| if (grep /^[ ]*$id[ ]+.*RUNNING/, @sacctres) { |
| print " Job seems to be still running, after all.\n" if DEBUG; |
| push @{$runids}, $id; |
| } elsif ($restart && !grep /^[ ]*$id[ ]+.*COMPLETED[ ]+0:0/, @sacctres) { |
| print " Job failed. "; |
| if (@restartedTasks >= $maxRestarts) { |
| print "Too many jobs have been restarted. Will not restart TASK_ID $taskid{$id}\n"; |
| } elsif (grep /^$taskid{$id}$/, @restartedTasks) { |
| print "TASK_ID $taskid{$id} has already been restarted once. Will not restart it again\n"; |
| } else { |
| print "Restarting TASK_ID $taskid{$id}\n"; |
| $ENV{'TASK_ID'} = $taskid{$id}; |
| my $newid = submit_job "$mainid.$taskid{$id}", "@commandLine"; |
| push @{$runids}, $newid; |
| $taskid{$newid} = $taskid{$id}; |
| push @restartedTasks, $taskid{$newid}; |
| sleep 1.5 + rand; # Sleep between 1.5 and 2.5 secs |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| |
| ## Make sure sub jobs do not inherit the main job TMPDIR or jobname: |
| delete $ENV{'TMPDIR'}; |
| delete $ENV{'SLURM_JOB_NAME'}; |
| |
| while (@jobArray) { |
| ## There is more to submit |
| print scalar localtime, ": Submitting jobs...\n" if DEBUG; |
| print scalar @jobArray, " more job(s) to submit\n" if DEBUG; |
| ## Submit as many as possible: |
| my $nToSubmit = min(scalar @jobArray, |
| $maxJobs - @{$runids} - @{$pendids}, |
| $maxIdleJobs - @{$pendids}, |
| $maxBurst); |
| print scalar(@{$runids}), " job(s) are running, and ", |
| scalar(@{$pendids}), " are waiting\n" if DEBUG; |
| print "Submitting $nToSubmit job(s):\n" if DEBUG; |
| for (my $i = 1; $i <= $nToSubmit; $i++) { |
| my $currJob = shift @jobArray; |
| print " TASK_ID $currJob:\n" if DEBUG; |
| ## Set $TASK_ID for the job: |
| $ENV{'TASK_ID'} = $currJob; |
| my $id = submit_job "$mainid.$currJob", "@commandLine"; |
| push @{$pendids}, $id; |
| $taskid{$id} = $currJob; |
| sleep 1.5 + rand; # Sleep between 1.5 and 2.5 secs |
| } |
| ## Wait a while: |
| print "Sleeping...\n" if DEBUG; |
| sleep $pollSeconds - 5 + int(rand(11)); |
| ## Find which are still running or waiting: |
| check_queue(); |
| } |
| print "All jobs have been submitted\n" if DEBUG; |
| |
| while (@{$runids} || @{$pendids}) { |
| ## Some jobs are still running or pending |
| print scalar(@{$runids}), " job(s) are still running, and ", |
| scalar(@{$pendids}), " are waiting\n" if DEBUG; |
| ## Wait a while |
| print "Sleeping...\n" if DEBUG; |
| sleep $pollSeconds - 5 + int(rand(11)); |
| ## Find which are still running or waiting: |
| check_queue(); |
| } |
| |
| print "Done.\n" if DEBUG; |