blob: 8107c72d4644cfef6bd0cdd1c043d835fb9a5b72 [file] [log] [blame] [edit]
#!/usr/bin/perl
### Simulate an array job -- work horse script
### $Id: arrayrun_worker,v 1.30 2011/04/27 08:58:25 root Exp $
### Copyright 2009,2010,2011 Bjørn-Helge Mevik <b.h.mevik@usit.uio.no>
###
### This program is free software; you can redistribute it and/or modify
### it under the terms of the GNU General Public License version 2 as
### published by the Free Software Foundation.
###
### This program is distributed in the hope that it will be useful,
### but WITHOUT ANY WARRANTY; without even the implied warranty of
### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
### GNU General Public License version 2 for more details.
###
### A copy of the GPL v. 2 text is available here:
### http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt
### Note: This script is meant to be run by 'arrayrun'; do not
### run this script directly.
use strict;
use List::Util qw/min/;
use Time::HiRes qw/sleep/;
## Debug:
use warnings;
use constant DEBUG => 1;
$| = 1 if DEBUG;
## Configuration:
my $maxJobs = 100; # Max total number of jobs in queue
my $maxIdleJobs = 10; # Max number of pending jobs in queue
my $maxBurst = 10; # Max number of jobs to submit at a time
my $pollSeconds = 180; # How many seconds to sleep between each poll
my $maxFails = 300; # Max errors to accept when submitting a job
my $retrySleep = 300; # Seconds to sleep between each retry
my $doubleCheckSleep = 30; # Seconds to sleep before double checking
my $maxRestarts = 10; # Max number of restarts all in all
my $sbatch = "/site/bin/sbatch";# Which sbatch command to use
## Parse command line
my $restart = 0;
if (@ARGV && $ARGV[0] eq '-r') {
$restart = 1;
shift @ARGV;
}
my $jobSpec = shift @ARGV or die "Too few arguments\n";
my @commandLine = @ARGV or die "Too few arguments\n";
my @jobArray;
foreach (split /,/, $jobSpec) {
if (/^(\d+)$/) {
push @jobArray, $1;
} elsif (/^(\d+)[-:](\d+)$/) {
push @jobArray, $1 .. $2;
} elsif (/^(\d+)[-:](\d+):(\d+)$/) {
for (my $i = $1; $i <= $2; $i += $3) {
push @jobArray, $i;
}
} else {
die "Unknown TASK_ID specification: '$_'\n";
}
}
die "No TASK_IDs specified\n" unless (@jobArray);
print "TASK_IDs to submit: ", join(",", @jobArray), "
Command line: @commandLine\n" if DEBUG;
print "Will restart failed jobs\n" if DEBUG && $restart;
## Setup
my $mainid = $ENV{'SLURM_JOB_ID'} || $ENV{'SLURM_JOBID'} || 'null';
my $runids = []; # List of IDs of running jobs
my $pendids = []; # List of IDs of pending jobs
my $testids = []; # List of IDs to test
my %taskid; # TASK_ID for all submitted jobs
my @restartedTasks; # TASK_ID of all restarted jobs
my @tmp = (localtime())[5,4,3];
my $starttime = sprintf "%d-%02d-%02d", $tmp[0] + 1900, $tmp[1] + 1, $tmp[2];
print "Main job id: $mainid\nStart time: $starttime\n" if DEBUG;
## Trap signals such that any running sub jobs are cancelled if the
## main job is cancelled or times out.
sub clean_up {
print "Caught signal. Cleaning up...\n" if DEBUG;
## Cancel any subjobs:
if (@{$runids} || @{$pendids} || @{$testids}) {
print "Cancelling @{$runids} @{$pendids} @{$testids}\n" if DEBUG;
system("echo scancel @{$runids} @{$pendids} @{$testids}");
system("scancel @{$runids} @{$pendids} @{$testids}");
print "Cancelled @{$runids} @{$pendids} @{$testids}\n" if DEBUG;
}
exit 0;
}
$SIG{'TERM'} = 'clean_up'; # scancel/timeout
$SIG{'INT'} = 'clean_up'; # ^C in interactive use
## Submit a job with fail resilience:
sub submit_job {
my $jobName = shift;
(my $commandLine = shift) || die "Job script not specified\n";
my $id;
my $nFails = 0;
my $success = 0;
until ($success) {
my $fail = 0;
$id = `$sbatch --job-name=$jobName $commandLine 2>&1`;
if ($? == 0) {
chomp($id);
print " Result from submit: $id" if DEBUG;
if ($id =~ s/.*Submitted batch job //) {
$success = 1;
}
} else {
warn " sbatch failed with error code '$?' (output: '",
$id || '', "'): $!\n";
$nFails++;
}
until ($success || $fail || $nFails > $maxFails) {
## Double check that the job did not start
warn " Problem with submitting/checking job. Checking with squeue in a while.\n";
sleep $doubleCheckSleep - 5 + int(rand(11));
$id = `squeue -h -o '%i %j' -u $ENV{USER}`;
if ($? == 0) {
chomp($id);
print " Result from squeue: $id" if DEBUG;
if ($id =~ s/ $jobName//) {
warn "Job '$jobName' seems to have been started as jobid '$id'. Using that id.\n";
$success = 1;
} else {
warn "Job '$jobName' did not start.\n";
$fail = 1;
}
} else {
$nFails++;
}
}
unless ($success) {
if ($nFails <= $maxFails) {
warn " Could not submit job. Trying again in a while.\n";
sleep $retrySleep - 5 + int(rand(11));
} else {
die " Cannot submit job. Giving up after $nFails errors.\n";
}
}
}
print " => job ID $id\n" if DEBUG;
$id;
}
## Check the given jobs, and return lists of the ones still running/waiting:
sub check_queue {
print scalar localtime, ": Checking queue...\n" if DEBUG;
my $queueids = `squeue -h -o '%i %t' 2>&1`;
if ($? != 0) {
print "squeue failed with error code '$?',\nmessage: $queueids\nI will assume all jobs are still running/waiting\n";
return;
}
my $testids = [ @{$runids}, @{$pendids} ];
print "Number of jobs to check: ", scalar @{$testids}, "\n" if DEBUG;
sleep 10 + rand; # Sleep to allow requeued jobs to get back
# in queue.
$runids = [];
$pendids = [];
foreach my $id (@{$testids}) {
if ($queueids =~ /$id (\w+)/) {
if ($1 eq "PD") {
print " Job $id is still waiting\n" if DEBUG;
push @{$pendids}, $id;
} else {
print " Job $id is still running\n" if DEBUG;
push @{$runids}, $id;
}
} else {
print " Job $id has finished:\n" if DEBUG;
my @sacctres = `sacct -o jobid,start,end,maxvmsize,maxrss,state,exitcode -S $starttime -j $id 2>&1`;
if ($? != 0) {
print " sacct failed with error code '$?',\n message: ",
@sacctres, " I will assume job $id finished successfully\n";
} else {
print join(" ", @sacctres);
if (grep /^[ ]*$id[ ]+.*RUNNING/, @sacctres) {
print " Job seems to be still running, after all.\n" if DEBUG;
push @{$runids}, $id;
} elsif ($restart && !grep /^[ ]*$id[ ]+.*COMPLETED[ ]+0:0/, @sacctres) {
print " Job failed. ";
if (@restartedTasks >= $maxRestarts) {
print "Too many jobs have been restarted. Will not restart TASK_ID $taskid{$id}\n";
} elsif (grep /^$taskid{$id}$/, @restartedTasks) {
print "TASK_ID $taskid{$id} has already been restarted once. Will not restart it again\n";
} else {
print "Restarting TASK_ID $taskid{$id}\n";
$ENV{'TASK_ID'} = $taskid{$id};
my $newid = submit_job "$mainid.$taskid{$id}", "@commandLine";
push @{$runids}, $newid;
$taskid{$newid} = $taskid{$id};
push @restartedTasks, $taskid{$newid};
sleep 1.5 + rand; # Sleep between 1.5 and 2.5 secs
}
}
}
}
}
}
## Make sure sub jobs do not inherit the main job TMPDIR or jobname:
delete $ENV{'TMPDIR'};
delete $ENV{'SLURM_JOB_NAME'};
while (@jobArray) {
## There is more to submit
print scalar localtime, ": Submitting jobs...\n" if DEBUG;
print scalar @jobArray, " more job(s) to submit\n" if DEBUG;
## Submit as many as possible:
my $nToSubmit = min(scalar @jobArray,
$maxJobs - @{$runids} - @{$pendids},
$maxIdleJobs - @{$pendids},
$maxBurst);
print scalar(@{$runids}), " job(s) are running, and ",
scalar(@{$pendids}), " are waiting\n" if DEBUG;
print "Submitting $nToSubmit job(s):\n" if DEBUG;
for (my $i = 1; $i <= $nToSubmit; $i++) {
my $currJob = shift @jobArray;
print " TASK_ID $currJob:\n" if DEBUG;
## Set $TASK_ID for the job:
$ENV{'TASK_ID'} = $currJob;
my $id = submit_job "$mainid.$currJob", "@commandLine";
push @{$pendids}, $id;
$taskid{$id} = $currJob;
sleep 1.5 + rand; # Sleep between 1.5 and 2.5 secs
}
## Wait a while:
print "Sleeping...\n" if DEBUG;
sleep $pollSeconds - 5 + int(rand(11));
## Find which are still running or waiting:
check_queue();
}
print "All jobs have been submitted\n" if DEBUG;
while (@{$runids} || @{$pendids}) {
## Some jobs are still running or pending
print scalar(@{$runids}), " job(s) are still running, and ",
scalar(@{$pendids}), " are waiting\n" if DEBUG;
## Wait a while
print "Sleeping...\n" if DEBUG;
sleep $pollSeconds - 5 + int(rand(11));
## Find which are still running or waiting:
check_queue();
}
print "Done.\n" if DEBUG;