|  | /*****************************************************************************\ | 
|  | *  scancel - cancel specified job(s) and/or job step(s) | 
|  | ***************************************************************************** | 
|  | *  Copyright (C) 2002-2007 The Regents of the University of California. | 
|  | *  Copyright (C) 2008-2009 Lawrence Livermore National Security. | 
|  | *  Copyright (C) SchedMD LLC. | 
|  | *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). | 
|  | *  Written by Morris Jette <jette1@llnl.gov> | 
|  | *  CODE-OCEC-09-009. All rights reserved. | 
|  | * | 
|  | *  This file is part of Slurm, a resource management program. | 
|  | *  For details, see <https://slurm.schedmd.com/>. | 
|  | *  Please also read the included file: DISCLAIMER. | 
|  | * | 
|  | *  Slurm is free software; you can redistribute it and/or modify it under | 
|  | *  the terms of the GNU General Public License as published by the Free | 
|  | *  Software Foundation; either version 2 of the License, or (at your option) | 
|  | *  any later version. | 
|  | * | 
|  | *  In addition, as a special exception, the copyright holders give permission | 
|  | *  to link the code of portions of this program with the OpenSSL library under | 
|  | *  certain conditions as described in each individual source file, and | 
|  | *  distribute linked combinations including the two. You must obey the GNU | 
|  | *  General Public License in all respects for all of the code used other than | 
|  | *  OpenSSL. If you modify file(s) with this exception, you may extend this | 
|  | *  exception to your version of the file(s), but you are not obligated to do | 
|  | *  so. If you do not wish to do so, delete this exception statement from your | 
|  | *  version.  If you delete this exception statement from all source files in | 
|  | *  the program, then also delete it here. | 
|  | * | 
|  | *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY | 
|  | *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 
|  | *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more | 
|  | *  details. | 
|  | * | 
|  | *  You should have received a copy of the GNU General Public License along | 
|  | *  with Slurm; if not, write to the Free Software Foundation, Inc., | 
|  | *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA. | 
|  | \*****************************************************************************/ | 
|  |  | 
|  | #include <errno.h> | 
|  | #include <inttypes.h> | 
|  | #include <pthread.h> | 
|  | #include <signal.h> | 
|  | #include <stdio.h> | 
|  | #include <stdlib.h> | 
|  | #include <string.h> | 
|  | #include <unistd.h> | 
|  |  | 
|  | #include "slurm/slurm.h" | 
|  |  | 
|  | #include "src/common/hostlist.h" | 
|  | #include "src/common/list.h" | 
|  | #include "src/common/log.h" | 
|  | #include "src/common/macros.h" | 
|  | #include "src/common/read_config.h" | 
|  | #include "src/common/slurm_protocol_api.h" | 
|  | #include "src/common/slurm_protocol_defs.h" | 
|  | #include "src/common/timers.h" | 
|  | #include "src/common/xstring.h" | 
|  | #include "src/common/xmalloc.h" | 
|  | #include "src/scancel/scancel.h" | 
|  |  | 
|  | #define MAX_CANCEL_RETRY 10 | 
|  | #define MAX_THREADS 10 | 
|  |  | 
|  | static void  _add_delay(void); | 
|  | static int   _cancel_jobs(void); | 
|  | static void *_cancel_job_id (void *cancel_info); | 
|  | static void *_cancel_step_id (void *cancel_info); | 
|  | static int  _confirmation(job_info_t *job_ptr, uint32_t step_id, | 
|  | uint32_t array_id); | 
|  | static void _filter_job_records(void); | 
|  | static void _load_job_records (void); | 
|  | static int _multi_cluster(list_t *clusters); | 
|  | static int  _proc_cluster(void); | 
|  | static int  _signal_job_by_str(void); | 
|  | static int  _verify_job_ids(void); | 
|  |  | 
|  | static job_info_msg_t * job_buffer_ptr = NULL; | 
|  |  | 
|  | typedef struct job_cancel_info { | 
|  | uint32_t array_job_id; | 
|  | uint32_t array_task_id; | 
|  | bool     array_flag; | 
|  | /* Note: Either set job_id_str OR job_id */ | 
|  | char *   job_id_str; | 
|  | uint32_t job_id; | 
|  | uint32_t step_id; | 
|  | uint16_t sig; | 
|  | int    * rc; | 
|  | int             *num_active_threads; | 
|  | pthread_mutex_t *num_active_threads_lock; | 
|  | pthread_cond_t  *num_active_threads_cond; | 
|  | } job_cancel_info_t; | 
|  |  | 
|  | static	int num_active_threads = 0; | 
|  | static	pthread_mutex_t  num_active_threads_lock; | 
|  | static	pthread_cond_t   num_active_threads_cond; | 
|  | static	pthread_mutex_t  max_delay_lock; | 
|  | static	uint32_t max_resp_time = 0; | 
|  | static	int request_count = 0; | 
|  | opt_t opt; | 
|  |  | 
|  | int | 
|  | main (int argc, char **argv) | 
|  | { | 
|  | log_options_t log_opts = LOG_OPTS_STDERR_ONLY ; | 
|  | int rc = 0; | 
|  |  | 
|  | slurm_init(NULL); | 
|  | log_init (xbasename(argv[0]), log_opts, SYSLOG_FACILITY_DAEMON, NULL); | 
|  | initialize_and_process_args(argc, argv); | 
|  | if (opt.verbose) { | 
|  | log_opts.stderr_level += opt.verbose; | 
|  | log_alter (log_opts, SYSLOG_FACILITY_DAEMON, NULL); | 
|  | } | 
|  |  | 
|  | if (opt.clusters) | 
|  | rc = _multi_cluster(opt.clusters); | 
|  | else | 
|  | rc = _proc_cluster(); | 
|  |  | 
|  | exit(rc); | 
|  | } | 
|  |  | 
|  | static uint16_t _init_flags(char **job_type) | 
|  | { | 
|  | uint16_t flags = 0; | 
|  |  | 
|  | if (opt.batch) { | 
|  | flags |= KILL_JOB_BATCH; | 
|  | if (job_type) | 
|  | *job_type = "batch "; | 
|  | } | 
|  |  | 
|  | /* | 
|  | * With the introduction of the ScronParameters=explicit_scancel option, | 
|  | * scancel requests for a cron job should be rejected unless the --cron | 
|  | * flag is specified. | 
|  | * To prevent introducing this option from influencing anything other | 
|  | * than user requests, it has been set up so that when KILL_CRON is not | 
|  | * set when explicit_scancel is also set, the request will be rejected. | 
|  | */ | 
|  | if (opt.cron) | 
|  | flags |= KILL_CRON; | 
|  |  | 
|  | if (opt.full) { | 
|  | flags |= KILL_FULL_JOB; | 
|  | if (job_type) | 
|  | *job_type = "full "; | 
|  | } | 
|  | if (opt.hurry) | 
|  | flags |= KILL_HURRY; | 
|  |  | 
|  | return flags; | 
|  | } | 
|  |  | 
|  | static bool _has_filter_opt(void) | 
|  | { | 
|  | return ((opt.account) || | 
|  | (opt.job_name) || | 
|  | (opt.nodelist) || | 
|  | (opt.partition) || | 
|  | (opt.qos) || | 
|  | (opt.reservation) || | 
|  | (opt.state != JOB_END) || | 
|  | (opt.user_name) || | 
|  | (opt.wckey)); | 
|  | } | 
|  |  | 
|  | static char *_filters2str(void) | 
|  | { | 
|  | char *str = NULL; | 
|  |  | 
|  | if (opt.account) | 
|  | xstrfmtcat(str, "account=%s ", opt.account); | 
|  | if (opt.job_name) | 
|  | xstrfmtcat(str, "job_name=%s ", opt.job_name); | 
|  | if (opt.nodelist) | 
|  | xstrfmtcat(str, "nodelist=%s ", opt.nodelist); | 
|  | if (opt.partition) | 
|  | xstrfmtcat(str, "partition=%s ", opt.partition); | 
|  | if (opt.qos) | 
|  | xstrfmtcat(str, "qos=%s ", opt.qos); | 
|  | if (opt.reservation) | 
|  | xstrfmtcat(str, "reservation=%s ", opt.reservation); | 
|  | if (opt.state != JOB_END) { | 
|  | xstrfmtcat(str, "state=%s ", | 
|  | job_state_string(opt.state)); | 
|  | } | 
|  | if (opt.user_name) | 
|  | xstrfmtcat(str, "user_name=%s ", opt.user_name); | 
|  | if (opt.wckey) | 
|  | xstrfmtcat(str, "wckey=%s ", opt.wckey); | 
|  |  | 
|  | return str; | 
|  | } | 
|  |  | 
|  | static void _log_filter_err_msg(void) | 
|  | { | 
|  | char *err_msg = _filters2str(); | 
|  |  | 
|  | if (err_msg) { | 
|  | error("No active jobs match ALL job filters, including: %s", | 
|  | err_msg); | 
|  | xfree(err_msg); | 
|  | } | 
|  | } | 
|  |  | 
|  | static void _log_signal_job_msg(char *job_type, char *job_id_str, | 
|  | uint16_t signal) | 
|  | { | 
|  | /* | 
|  | * If signal was not explicitly requested, just say "terminating job". | 
|  | * Otherwise, specify the signal number even if it is SIGKILL which is | 
|  | * the default. | 
|  | */ | 
|  | if (opt.signal == NO_VAL16) | 
|  | verbose("Terminating %sjob %s", job_type, job_id_str); | 
|  | else | 
|  | verbose("Signal %u to %sjob %s", signal, job_type, job_id_str); | 
|  | } | 
|  |  | 
|  | static void _log_kill_job_error(char *job_id_str, char *err_msg) | 
|  | { | 
|  | error("Kill job error on job id %s: %s", job_id_str, err_msg); | 
|  | } | 
|  |  | 
|  | static int _ctld_signal_jobs(void) | 
|  | { | 
|  | int i; | 
|  | int rc; | 
|  | bool successful_job_resp = false; | 
|  | char *job_type = ""; | 
|  | kill_jobs_msg_t kill_msg = { | 
|  | .account = opt.account, | 
|  | .job_name = opt.job_name, | 
|  | .jobs_array = opt.job_list, | 
|  | .partition = opt.partition, | 
|  | .qos = opt.qos, | 
|  | .reservation = opt.reservation, | 
|  | .signal = opt.signal, | 
|  | .state = opt.state, | 
|  | .user_id = opt.user_id, | 
|  | .user_name = opt.user_name, | 
|  | .wckey = opt.wckey, | 
|  | .nodelist = opt.nodelist, | 
|  | }; | 
|  | kill_jobs_resp_msg_t *kill_msg_resp = NULL; | 
|  |  | 
|  | if (opt.job_list) { | 
|  | for (i = 0; opt.job_list[i]; i++); | 
|  | kill_msg.jobs_cnt = i; | 
|  | } | 
|  |  | 
|  | kill_msg.flags = _init_flags(&job_type); | 
|  | if (opt.verbose) | 
|  | kill_msg.flags |= KILL_JOBS_VERBOSE; | 
|  | if (kill_msg.signal == NO_VAL16) | 
|  | kill_msg.signal = SIGKILL; | 
|  |  | 
|  | if ((rc = slurm_kill_jobs(&kill_msg, &kill_msg_resp))) { | 
|  | error("%s", slurm_strerror(rc)); | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | for (int i = 0; i < kill_msg_resp->jobs_cnt; i++) { | 
|  | kill_jobs_resp_job_t *job_resp = | 
|  | &kill_msg_resp->job_responses[i]; | 
|  | uint32_t error_code = job_resp->error_code; | 
|  |  | 
|  | if (error_code == SLURM_SUCCESS) | 
|  | successful_job_resp = true; | 
|  |  | 
|  | if (opt.verbose || | 
|  | ((error_code != ESLURM_ALREADY_DONE) && | 
|  | (error_code != ESLURM_INVALID_JOB_ID))) { | 
|  | char *job_id_str = NULL; | 
|  |  | 
|  | /* | 
|  | * FIXME: | 
|  | * If in a federation and we use scancel -Mall then | 
|  | * we can get some errors returned for some jobs | 
|  | * from one cluster but then the other cluster would | 
|  | * return success for all jobs. It would be nice to | 
|  | * handle this situation better. | 
|  | * In addition if we only signalled some clusters and | 
|  | * got responses for jobs that were revoked and thus | 
|  | * unable to be signalled, we could forward the request | 
|  | * to job_resp->sibling rather than just log an error | 
|  | * here. | 
|  | */ | 
|  |  | 
|  | rc = fmt_job_id_string(job_resp->id, &job_id_str); | 
|  | if (rc != SLURM_SUCCESS) | 
|  | error("Bad job id format returned: %s; %s", | 
|  | slurm_strerror(rc), job_resp->error_msg); | 
|  | else if (job_resp->error_code != SLURM_SUCCESS) | 
|  | _log_kill_job_error(job_id_str, | 
|  | job_resp->error_msg); | 
|  | else | 
|  | _log_signal_job_msg(job_type, job_id_str, | 
|  | kill_msg.signal); | 
|  | xfree(job_id_str); | 
|  | } | 
|  | } | 
|  |  | 
|  | if (opt.verbose && _has_filter_opt() && | 
|  | (!kill_msg_resp->jobs_cnt || !successful_job_resp)) | 
|  | _log_filter_err_msg(); | 
|  |  | 
|  | slurm_free_kill_jobs_response_msg(kill_msg_resp); | 
|  |  | 
|  | return SLURM_SUCCESS; | 
|  | } | 
|  |  | 
|  | /* _multi_cluster - process job cancellation across a list of clusters */ | 
|  | static int _multi_cluster(list_t *clusters) | 
|  | { | 
|  | list_itr_t *itr; | 
|  | int rc = 0, rc2; | 
|  |  | 
|  | itr = list_iterator_create(clusters); | 
|  | while ((working_cluster_rec = list_next(itr))) { | 
|  | rc2 = _proc_cluster(); | 
|  | rc = MAX(rc, rc2); | 
|  | } | 
|  | list_iterator_destroy(itr); | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | /* _proc_cluster - process job cancellation on a specific cluster */ | 
|  | static int | 
|  | _proc_cluster(void) | 
|  | { | 
|  | int rc, rc2; | 
|  |  | 
|  | if (has_default_opt() && !has_job_steps()) { | 
|  | rc = _signal_job_by_str(); | 
|  | return rc; | 
|  | } | 
|  | /* | 
|  | * TODO: | 
|  | * Remove sibling restriction once --ctld has this logic implemented. | 
|  | */ | 
|  | if (opt.ctld && !(opt.sibling || has_job_steps() || has_fed_jobs())) | 
|  | return _ctld_signal_jobs(); | 
|  |  | 
|  | _load_job_records(); | 
|  | rc = _verify_job_ids(); | 
|  | if (_has_filter_opt()) | 
|  | _filter_job_records(); | 
|  | rc2 = _cancel_jobs(); | 
|  | rc = MAX(rc, rc2); | 
|  | slurm_free_job_info_msg(job_buffer_ptr); | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | /* _load_job_records - load all job information for filtering | 
|  | * and verification | 
|  | */ | 
|  | static void | 
|  | _load_job_records (void) | 
|  | { | 
|  | int error_code; | 
|  | uint16_t show_flags = 0; | 
|  |  | 
|  | show_flags |= SHOW_ALL; | 
|  | show_flags |= opt.clusters ? SHOW_LOCAL : SHOW_FEDERATION; | 
|  |  | 
|  | /* We need the fill job array string representation for identifying | 
|  | * and killing job arrays */ | 
|  | setenv("SLURM_BITSTR_LEN", "0", 1); | 
|  | error_code = slurm_load_jobs((time_t) NULL, &job_buffer_ptr, | 
|  | show_flags); | 
|  |  | 
|  | if (error_code) { | 
|  | slurm_perror ("slurm_load_jobs error"); | 
|  | exit (1); | 
|  | } | 
|  | } | 
|  |  | 
|  | static bool _is_task_in_job(job_info_t *job_ptr, int array_id) | 
|  | { | 
|  | int len; | 
|  |  | 
|  | if (job_ptr->array_task_id == array_id) | 
|  | return true; | 
|  |  | 
|  | if (!job_ptr->array_bitmap) | 
|  | return false; | 
|  | len = bit_size(job_ptr->array_bitmap); | 
|  | if (len <= array_id) | 
|  | return false; | 
|  | return bit_test(job_ptr->array_bitmap, array_id); | 
|  | } | 
|  |  | 
|  | static int _verify_job_ids(void) | 
|  | { | 
|  | job_info_t *job_ptr; | 
|  | int i, j, rc = 0; | 
|  |  | 
|  | if (opt.job_cnt == 0) | 
|  | return rc; | 
|  |  | 
|  | opt.job_found = xmalloc(sizeof(bool) * opt.job_cnt); | 
|  | opt.job_pend  = xmalloc(sizeof(bool) * opt.job_cnt); | 
|  | job_ptr = job_buffer_ptr->job_array; | 
|  | for (i = 0; i < job_buffer_ptr->record_count; i++, job_ptr++) { | 
|  | /* NOTE: We reuse the job's "assoc_id" value as a flag to | 
|  | * record if the job is referenced in the job list supplied | 
|  | * by the user. */ | 
|  | job_ptr->assoc_id = 0; | 
|  | if (IS_JOB_FINISHED(job_ptr)) | 
|  | job_ptr->job_id = 0; | 
|  | if (job_ptr->job_id == 0) | 
|  | continue; | 
|  |  | 
|  | for (j = 0; j < opt.job_cnt; j++) { | 
|  | if (opt.array_id[j] == NO_VAL) { | 
|  | if ((opt.job_id[j] == job_ptr->job_id) || | 
|  | ((opt.job_id[j] == job_ptr->array_job_id) && | 
|  | (opt.step_id[j] == SLURM_BATCH_SCRIPT))) { | 
|  | opt.job_found[j] = true; | 
|  | } | 
|  | } else if (opt.array_id[j] == INFINITE) { | 
|  | if (opt.job_id[j] == job_ptr->array_job_id) { | 
|  | opt.job_found[j] = true; | 
|  | } | 
|  | } else if (opt.job_id[j] != job_ptr->array_job_id) { | 
|  | continue; | 
|  | } else if (_is_task_in_job(job_ptr, opt.array_id[j])) { | 
|  | opt.job_found[j] = true; | 
|  | } | 
|  | if (opt.job_found[j]) { | 
|  | if (IS_JOB_PENDING(job_ptr)) | 
|  | opt.job_pend[j] = true; | 
|  | job_ptr->assoc_id = 1; | 
|  | } | 
|  | } | 
|  | if (job_ptr->assoc_id == 0) | 
|  | job_ptr->job_id = 0; | 
|  | } | 
|  |  | 
|  | for (j = 0; j < opt.job_cnt; j++) { | 
|  | char *job_id_str = NULL; | 
|  | if (!opt.job_found[j]) | 
|  | rc = 1; | 
|  | else | 
|  | continue; | 
|  |  | 
|  | if (opt.verbose < 0) { | 
|  | ; | 
|  | } else if (opt.array_id[j] == NO_VAL) { | 
|  | xstrfmtcat(job_id_str, "%u", opt.job_id[j]); | 
|  | } else if (opt.array_id[j] == INFINITE) { | 
|  | xstrfmtcat(job_id_str, "%u_*", opt.job_id[j]); | 
|  | } else { | 
|  | xstrfmtcat(job_id_str, "%u_%u", opt.job_id[j], | 
|  | opt.array_id[j]); | 
|  | } | 
|  |  | 
|  | if (opt.verbose < 0) { | 
|  | ; | 
|  | } else if (opt.step_id[j] == SLURM_BATCH_SCRIPT) { | 
|  | char *err_msg = slurm_strerror(ESLURM_INVALID_JOB_ID); | 
|  | _log_kill_job_error(job_id_str, err_msg); | 
|  | } else { | 
|  | error("Kill job error on job step id %s.%u: %s", | 
|  | job_id_str, opt.step_id[j], | 
|  | slurm_strerror(ESLURM_INVALID_JOB_ID)); | 
|  | } | 
|  | xfree(job_id_str); | 
|  |  | 
|  | /* Avoid this job in the cancel_job logic */ | 
|  | opt.job_id[j] = 0; | 
|  | } | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | /* _filter_job_records - filtering job information per user specification | 
|  | * RET Count of job's filtered out OTHER than for job ID value */ | 
|  | static void _filter_job_records(void) | 
|  | { | 
|  | int i, job_matches = 0; | 
|  | job_info_t *job_ptr = NULL, *het_leader = NULL; | 
|  | uint32_t job_base_state; | 
|  |  | 
|  | job_ptr = job_buffer_ptr->job_array; | 
|  | for (i = 0; i < job_buffer_ptr->record_count; i++, job_ptr++) { | 
|  | if (job_ptr->het_job_id && !job_ptr->het_job_offset) | 
|  | het_leader = job_ptr; | 
|  |  | 
|  | if (IS_JOB_FINISHED(job_ptr)) | 
|  | job_ptr->job_id = 0; | 
|  | if (job_ptr->job_id == 0) | 
|  | continue; | 
|  |  | 
|  | job_base_state = job_ptr->job_state & JOB_STATE_BASE; | 
|  | if ((job_base_state != JOB_PENDING) && | 
|  | (job_base_state != JOB_RUNNING) && | 
|  | (job_base_state != JOB_SUSPENDED)) { | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  |  | 
|  | if (opt.account && | 
|  | xstrcmp(job_ptr->account, opt.account)) { | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  |  | 
|  | if (opt.job_name && | 
|  | xstrcmp(job_ptr->name, opt.job_name)) { | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  |  | 
|  | if (opt.partition && | 
|  | xstrcmp(job_ptr->partition, opt.partition)) { | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  |  | 
|  | if (opt.qos && xstrcmp(job_ptr->qos, opt.qos)) { | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  |  | 
|  | if (opt.reservation && | 
|  | xstrcmp(job_ptr->resv_name, opt.reservation)) { | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  |  | 
|  | if ((opt.state != JOB_END) && | 
|  | (job_base_state != opt.state)) { | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  |  | 
|  | if ((opt.user_name) && | 
|  | (job_ptr->user_id != opt.user_id)) { | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  |  | 
|  | if (opt.nodelist) { | 
|  | hostset_t *hs = hostset_create(job_ptr->nodes); | 
|  | if (!hostset_intersects(hs, opt.nodelist)) { | 
|  | job_ptr->job_id = 0; | 
|  | hostset_destroy(hs); | 
|  | continue; | 
|  | } else { | 
|  | hostset_destroy(hs); | 
|  | } | 
|  | } | 
|  |  | 
|  | if (opt.wckey) { | 
|  | char *job_key = job_ptr->wckey; | 
|  |  | 
|  | /* | 
|  | * A wckey that begins with '*' indicates that the wckey | 
|  | * was applied by default.  When the --wckey option does | 
|  | * not begin with a '*', act on all wckeys with the same | 
|  | * name, default or not. | 
|  | */ | 
|  | if ((opt.wckey[0] != '*') && job_key && | 
|  | (job_key[0] == '*')) | 
|  | job_key++; | 
|  |  | 
|  | if (xstrcmp(job_key, opt.wckey) != 0) { | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  | } | 
|  |  | 
|  | if (het_leader && het_leader->job_id && | 
|  | job_ptr->het_job_offset && | 
|  | (job_ptr->het_job_id == het_leader->het_job_id)) { | 
|  | /* | 
|  | * Filter out HetJob non-leader component as its leader | 
|  | * should have already been evaluated and hasn't been | 
|  | * filtered out. | 
|  | * | 
|  | * The leader RPC signal handler will affect all the | 
|  | * components, so this avoids extra unneeded RPCs, races | 
|  | * and issues interpreting multiple error codes. | 
|  | * | 
|  | * This can be done assuming the walking of the loaded | 
|  | * jobs is guaranteed to evaluate in an order such that | 
|  | * HetJob leaders are evaluated before their matching | 
|  | * non-leaders and the whole HetJob is evaluated | 
|  | * contiguously. The slurmctld job_list is ordered by | 
|  | * job creation time (always leader first) and HetJobs | 
|  | * are created in a row. | 
|  | */ | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  |  | 
|  | job_matches++; | 
|  | } | 
|  |  | 
|  |  | 
|  | if ((job_matches == 0) && (opt.verbose > 0)) | 
|  | _log_filter_err_msg(); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | static char *_build_jobid_str(job_info_t *job_ptr, uint32_t array_id) | 
|  | { | 
|  | char *result = NULL; | 
|  |  | 
|  | if (array_id != NO_VAL && array_id != INFINITE) { | 
|  | xstrfmtcat(result, "%u_%u", job_ptr->array_job_id, array_id); | 
|  | } else if (job_ptr->array_task_str) { | 
|  | xstrfmtcat(result, "%u_[%s]", | 
|  | job_ptr->array_job_id, job_ptr->array_task_str); | 
|  | } else if (job_ptr->array_task_id != NO_VAL) { | 
|  | xstrfmtcat(result, "%u_%u", | 
|  | job_ptr->array_job_id, job_ptr->array_task_id); | 
|  | } else { | 
|  | xstrfmtcat(result, "%u", job_ptr->job_id); | 
|  | } | 
|  |  | 
|  | return result; | 
|  | } | 
|  |  | 
|  | static void _cancel_jobid_by_state(uint32_t job_state, int *rc) | 
|  | { | 
|  | job_cancel_info_t *cancel_info; | 
|  | job_info_t *job_ptr; | 
|  | int i, j; | 
|  |  | 
|  | if (opt.job_cnt == 0) | 
|  | return; | 
|  |  | 
|  | for (j = 0; j < opt.job_cnt; j++) { | 
|  | if (opt.job_id[j] == 0) | 
|  | continue; | 
|  | if ((job_state == JOB_PENDING) && !opt.job_pend[j]) | 
|  | continue; | 
|  |  | 
|  | job_ptr = job_buffer_ptr->job_array; | 
|  | for (i = 0; i < job_buffer_ptr->record_count; i++, job_ptr++) { | 
|  | if (IS_JOB_FINISHED(job_ptr)) | 
|  | job_ptr->job_id = 0; | 
|  | if (job_ptr->job_id == 0) | 
|  | continue; | 
|  | if ((opt.step_id[j] != SLURM_BATCH_SCRIPT) && | 
|  | IS_JOB_PENDING(job_ptr)) { | 
|  | /* User specified #.# for step, but the job ID | 
|  | * may be job array leader with part of job | 
|  | * array running with other tasks pending */ | 
|  | continue; | 
|  | } | 
|  |  | 
|  | opt.job_found[j] = false; | 
|  | if (opt.array_id[j] == NO_VAL) { | 
|  | if ((opt.job_id[j] == job_ptr->job_id) || | 
|  | ((opt.job_id[j] == job_ptr->array_job_id) && | 
|  | (opt.step_id[j] == SLURM_BATCH_SCRIPT))) { | 
|  | opt.job_found[j] = true; | 
|  | } | 
|  | } else if (opt.array_id[j] == INFINITE) { | 
|  | if (opt.job_id[j] == job_ptr->array_job_id) { | 
|  | opt.job_found[j] = true; | 
|  | } | 
|  | } else if (opt.job_id[j] != job_ptr->array_job_id) { | 
|  | continue; | 
|  | } else if (_is_task_in_job(job_ptr, opt.array_id[j])) { | 
|  | opt.job_found[j] = true; | 
|  | } | 
|  |  | 
|  | if (!opt.job_found[j]) | 
|  | continue; | 
|  |  | 
|  | if (opt.interactive && | 
|  | (_confirmation(job_ptr, opt.step_id[j], | 
|  | opt.array_id[j]) == 0)) { | 
|  | opt.job_id[j] = 0;	 /* Don't check again */ | 
|  | continue; | 
|  | } | 
|  |  | 
|  | slurm_mutex_lock(&num_active_threads_lock); | 
|  | num_active_threads++; | 
|  | while (num_active_threads > MAX_THREADS) { | 
|  | slurm_cond_wait(&num_active_threads_cond, | 
|  | &num_active_threads_lock); | 
|  | } | 
|  | slurm_mutex_unlock(&num_active_threads_lock); | 
|  |  | 
|  | cancel_info = (job_cancel_info_t *) | 
|  | xmalloc(sizeof(job_cancel_info_t)); | 
|  | cancel_info->rc      = rc; | 
|  | cancel_info->sig     = opt.signal; | 
|  | cancel_info->num_active_threads = &num_active_threads; | 
|  | cancel_info->num_active_threads_lock = | 
|  | &num_active_threads_lock; | 
|  | cancel_info->num_active_threads_cond = | 
|  | &num_active_threads_cond; | 
|  | if (opt.step_id[j] == SLURM_BATCH_SCRIPT) { | 
|  | cancel_info->job_id_str = | 
|  | _build_jobid_str(job_ptr, | 
|  | opt.array_id[j]); | 
|  |  | 
|  | slurm_thread_create_detached(_cancel_job_id, | 
|  | cancel_info); | 
|  |  | 
|  | if (opt.array_id[j] == NO_VAL || | 
|  | opt.array_id[j] == INFINITE) | 
|  | job_ptr->job_id = 0; | 
|  | else | 
|  | opt.job_id[j] = 0; | 
|  | } else { | 
|  | cancel_info->job_id = job_ptr->job_id; | 
|  | cancel_info->step_id = opt.step_id[j]; | 
|  | slurm_thread_create_detached(_cancel_step_id, | 
|  | cancel_info); | 
|  | } | 
|  |  | 
|  | if (opt.interactive) { | 
|  | /* Print any error message for first job before | 
|  | * starting confirmation of next job */ | 
|  | slurm_mutex_lock(&num_active_threads_lock); | 
|  | while (num_active_threads > 0) { | 
|  | slurm_cond_wait(&num_active_threads_cond, | 
|  | &num_active_threads_lock); | 
|  | } | 
|  | slurm_mutex_unlock(&num_active_threads_lock); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | static void | 
|  | _cancel_jobs_by_state(uint32_t job_state, int *rc) | 
|  | { | 
|  | int i; | 
|  | job_cancel_info_t *cancel_info; | 
|  | job_info_t *job_ptr = job_buffer_ptr->job_array; | 
|  |  | 
|  | /* Spawn a thread to cancel each job or job step marked for | 
|  | * cancellation */ | 
|  | if (opt.job_cnt) { | 
|  | _cancel_jobid_by_state(job_state, rc); | 
|  | return; | 
|  | } | 
|  |  | 
|  | for (i = 0; i < job_buffer_ptr->record_count; i++, job_ptr++) { | 
|  | if (IS_JOB_FINISHED(job_ptr)) | 
|  | job_ptr->job_id = 0; | 
|  | if (job_ptr->job_id == 0) | 
|  | continue; | 
|  |  | 
|  | if ((job_state < JOB_END) && | 
|  | (job_ptr->job_state != job_state)) | 
|  | continue; | 
|  |  | 
|  | if (opt.interactive && | 
|  | (_confirmation(job_ptr, SLURM_BATCH_SCRIPT, NO_VAL) == 0)) { | 
|  | job_ptr->job_id = 0; | 
|  | continue; | 
|  | } | 
|  |  | 
|  | cancel_info = (job_cancel_info_t *) | 
|  | xmalloc(sizeof(job_cancel_info_t)); | 
|  | cancel_info->job_id_str = _build_jobid_str(job_ptr, NO_VAL); | 
|  | cancel_info->rc      = rc; | 
|  | cancel_info->sig     = opt.signal; | 
|  | cancel_info->num_active_threads = &num_active_threads; | 
|  | cancel_info->num_active_threads_lock = | 
|  | &num_active_threads_lock; | 
|  | cancel_info->num_active_threads_cond = | 
|  | &num_active_threads_cond; | 
|  |  | 
|  | slurm_mutex_lock(&num_active_threads_lock); | 
|  | num_active_threads++; | 
|  | while (num_active_threads > MAX_THREADS) { | 
|  | slurm_cond_wait(&num_active_threads_cond, | 
|  | &num_active_threads_lock); | 
|  | } | 
|  | slurm_mutex_unlock(&num_active_threads_lock); | 
|  |  | 
|  | slurm_thread_create_detached(_cancel_job_id, cancel_info); | 
|  | job_ptr->job_id = 0; | 
|  |  | 
|  | if (opt.interactive) { | 
|  | /* Print any error message for first job before | 
|  | * starting confirmation of next job */ | 
|  | slurm_mutex_lock(&num_active_threads_lock); | 
|  | while (num_active_threads > 0) { | 
|  | slurm_cond_wait(&num_active_threads_cond, | 
|  | &num_active_threads_lock); | 
|  | } | 
|  | slurm_mutex_unlock(&num_active_threads_lock); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /* _cancel_jobs - filter then cancel jobs or job steps per request */ | 
|  | static int _cancel_jobs(void) | 
|  | { | 
|  | int rc = 0; | 
|  |  | 
|  | slurm_mutex_init(&num_active_threads_lock); | 
|  | slurm_cond_init(&num_active_threads_cond, NULL); | 
|  |  | 
|  | _cancel_jobs_by_state(JOB_PENDING, &rc); | 
|  | /* Wait for any cancel of pending jobs to complete before starting | 
|  | * cancellation of running jobs so that we don't have a race condition | 
|  | * with pending jobs getting scheduled while running jobs are also | 
|  | * being cancelled. */ | 
|  | slurm_mutex_lock( &num_active_threads_lock ); | 
|  | while (num_active_threads > 0) { | 
|  | slurm_cond_wait(&num_active_threads_cond, | 
|  | &num_active_threads_lock); | 
|  | } | 
|  | slurm_mutex_unlock(&num_active_threads_lock); | 
|  |  | 
|  | _cancel_jobs_by_state(JOB_END, &rc); | 
|  | /* Wait for any spawned threads that have not finished */ | 
|  | slurm_mutex_lock( &num_active_threads_lock ); | 
|  | while (num_active_threads > 0) { | 
|  | slurm_cond_wait(&num_active_threads_cond, | 
|  | &num_active_threads_lock); | 
|  | } | 
|  | slurm_mutex_unlock(&num_active_threads_lock); | 
|  |  | 
|  | slurm_mutex_destroy(&num_active_threads_lock); | 
|  | slurm_cond_destroy(&num_active_threads_cond); | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | /* scancel can cancel huge numbers of job from a single command line using | 
|  | * pthreads for parallelism. Add a delay if there are many RPCs and response | 
|  | * delays get excessive to avoid causing a denial of service attack. */ | 
|  | static void _add_delay(void) | 
|  | { | 
|  | static int target_resp_time = -1; | 
|  | static int delay_time = 10000, previous_delay = 0; | 
|  | int my_delay; | 
|  |  | 
|  | slurm_mutex_lock(&max_delay_lock); | 
|  | if (target_resp_time < 0) { | 
|  | target_resp_time = slurm_conf.msg_timeout / 4; | 
|  | target_resp_time = MAX(target_resp_time, 3); | 
|  | target_resp_time = MIN(target_resp_time, 5); | 
|  | target_resp_time *= USEC_IN_SEC; | 
|  | debug("%s: target response time = %d", __func__, | 
|  | target_resp_time); | 
|  | } | 
|  | if ((++request_count < MAX_THREADS) || | 
|  | (max_resp_time <= target_resp_time)) { | 
|  | slurm_mutex_unlock(&max_delay_lock); | 
|  | return; | 
|  | } | 
|  |  | 
|  | /* Maximum delay of 1 second. Start at 10 msec with Fibonacci backoff */ | 
|  | my_delay = MIN((delay_time + previous_delay), USEC_IN_SEC); | 
|  | previous_delay = delay_time; | 
|  | delay_time = my_delay; | 
|  | slurm_mutex_unlock(&max_delay_lock); | 
|  |  | 
|  | info("%s: adding delay in RPC send of %d usec", __func__, my_delay); | 
|  | usleep(my_delay); | 
|  | return; | 
|  | } | 
|  |  | 
|  | static void * | 
|  | _cancel_job_id (void *ci) | 
|  | { | 
|  | int error_code = SLURM_SUCCESS, i; | 
|  | job_cancel_info_t *cancel_info = (job_cancel_info_t *)ci; | 
|  | uint16_t flags = 0; | 
|  | char *job_type = ""; | 
|  | DEF_TIMERS; | 
|  |  | 
|  | flags = _init_flags(&job_type); | 
|  | if (cancel_info->sig == NO_VAL16) { | 
|  | cancel_info->sig = SIGKILL; | 
|  | } | 
|  |  | 
|  | if (!cancel_info->job_id_str) { | 
|  | if (cancel_info->array_job_id && | 
|  | (cancel_info->array_task_id == INFINITE)) { | 
|  | xstrfmtcat(cancel_info->job_id_str, "%u_*", | 
|  | cancel_info->array_job_id); | 
|  | } else if (cancel_info->array_job_id) { | 
|  | xstrfmtcat(cancel_info->job_id_str, "%u_%u", | 
|  | cancel_info->array_job_id, | 
|  | cancel_info->array_task_id); | 
|  | } else { | 
|  | xstrfmtcat(cancel_info->job_id_str, "%u", | 
|  | cancel_info->job_id); | 
|  | } | 
|  | } | 
|  |  | 
|  | _log_signal_job_msg(job_type, cancel_info->job_id_str, | 
|  | cancel_info->sig); | 
|  |  | 
|  | for (i = 0; i < MAX_CANCEL_RETRY; i++) { | 
|  | _add_delay(); | 
|  | START_TIMER; | 
|  |  | 
|  | error_code = slurm_kill_job2(cancel_info->job_id_str, | 
|  | cancel_info->sig, flags, | 
|  | opt.sibling); | 
|  |  | 
|  | END_TIMER; | 
|  | slurm_mutex_lock(&max_delay_lock); | 
|  | max_resp_time = MAX(max_resp_time, DELTA_TIMER); | 
|  | slurm_mutex_unlock(&max_delay_lock); | 
|  |  | 
|  | if ((error_code == 0) || | 
|  | (errno != ESLURM_TRANSITION_STATE_NO_UPDATE)) | 
|  | break; | 
|  | verbose("Job is in transitional state, retrying"); | 
|  | sleep(5 + i); | 
|  | } | 
|  | if (error_code) { | 
|  | error_code = errno; | 
|  | if ((opt.verbose > 0) || | 
|  | ((error_code != ESLURM_ALREADY_DONE) && | 
|  | (error_code != ESLURM_INVALID_JOB_ID))) { | 
|  | _log_kill_job_error(cancel_info->job_id_str, | 
|  | slurm_strerror(error_code)); | 
|  | } | 
|  | if (((error_code == ESLURM_ALREADY_DONE) || | 
|  | (error_code == ESLURM_INVALID_JOB_ID)) && | 
|  | (cancel_info->sig == SIGKILL)) { | 
|  | error_code = 0;	/* Ignore error if job done */ | 
|  | } | 
|  | } | 
|  |  | 
|  | /* Purposely free the struct passed in here, so the caller doesn't have | 
|  | * to keep track of it, but don't destroy the mutex and condition | 
|  | * variables contained. */ | 
|  | slurm_mutex_lock(cancel_info->num_active_threads_lock); | 
|  | *(cancel_info->rc) = MAX(*(cancel_info->rc), error_code); | 
|  | (*(cancel_info->num_active_threads))--; | 
|  | slurm_cond_signal(cancel_info->num_active_threads_cond); | 
|  | slurm_mutex_unlock(cancel_info->num_active_threads_lock); | 
|  |  | 
|  | xfree(cancel_info->job_id_str); | 
|  | xfree(cancel_info); | 
|  | return NULL; | 
|  | } | 
|  |  | 
|  | static void * | 
|  | _cancel_step_id (void *ci) | 
|  | { | 
|  | int error_code = SLURM_SUCCESS, i; | 
|  | job_cancel_info_t *cancel_info = (job_cancel_info_t *)ci; | 
|  | uint32_t job_id  = cancel_info->job_id; | 
|  | uint32_t step_id = cancel_info->step_id; | 
|  | bool sig_set = true; | 
|  | DEF_TIMERS; | 
|  |  | 
|  | if (cancel_info->sig == NO_VAL16) { | 
|  | cancel_info->sig = SIGKILL; | 
|  | sig_set = false; | 
|  | } | 
|  |  | 
|  | if (!cancel_info->job_id_str) { | 
|  | if (cancel_info->array_job_id && | 
|  | (cancel_info->array_task_id == INFINITE)) { | 
|  | xstrfmtcat(cancel_info->job_id_str, "%u_*", | 
|  | cancel_info->array_job_id); | 
|  | } else if (cancel_info->array_job_id) { | 
|  | xstrfmtcat(cancel_info->job_id_str, "%u_%u", | 
|  | cancel_info->array_job_id, | 
|  | cancel_info->array_task_id); | 
|  | } else { | 
|  | xstrfmtcat(cancel_info->job_id_str, "%u", | 
|  | cancel_info->job_id); | 
|  | } | 
|  | } | 
|  |  | 
|  | for (i = 0; i < MAX_CANCEL_RETRY; i++) { | 
|  | if (cancel_info->sig == SIGKILL) { | 
|  | verbose("Terminating step %s.%u", | 
|  | cancel_info->job_id_str, step_id); | 
|  | } else { | 
|  | verbose("Signal %u to step %s.%u", | 
|  | cancel_info->sig, | 
|  | cancel_info->job_id_str, step_id); | 
|  | } | 
|  |  | 
|  | _add_delay(); | 
|  | START_TIMER; | 
|  | if ((!sig_set) || opt.ctld) | 
|  | error_code = slurm_kill_job_step(job_id, step_id, | 
|  | cancel_info->sig, 0); | 
|  | else if (cancel_info->sig == SIGKILL) | 
|  | error_code = slurm_terminate_job_step(job_id, step_id); | 
|  | else | 
|  | error_code = slurm_signal_job_step(job_id, step_id, | 
|  | cancel_info->sig); | 
|  | END_TIMER; | 
|  | slurm_mutex_lock(&max_delay_lock); | 
|  | max_resp_time = MAX(max_resp_time, DELTA_TIMER); | 
|  | slurm_mutex_unlock(&max_delay_lock); | 
|  |  | 
|  | if ((error_code == 0) || | 
|  | ((errno != ESLURM_TRANSITION_STATE_NO_UPDATE) && | 
|  | (errno != ESLURM_JOB_PENDING))) | 
|  | break; | 
|  | verbose("Job is in transitional state, retrying"); | 
|  | sleep(5 + i); | 
|  | } | 
|  | if (error_code) { | 
|  | error_code = errno; | 
|  | if ((opt.verbose > 0) || (error_code != ESLURM_ALREADY_DONE)) | 
|  | error("Kill job error on job step id %s: %s", | 
|  | cancel_info->job_id_str, | 
|  | slurm_strerror(error_code)); | 
|  |  | 
|  | if ((error_code == ESLURM_ALREADY_DONE) && | 
|  | (cancel_info->sig == SIGKILL)) { | 
|  | error_code = 0;	/* Ignore error if job done */ | 
|  | } | 
|  | } | 
|  |  | 
|  | /* Purposely free the struct passed in here, so the caller doesn't have | 
|  | * to keep track of it, but don't destroy the mutex and condition | 
|  | * variables contained. */ | 
|  | slurm_mutex_lock(cancel_info->num_active_threads_lock); | 
|  | *(cancel_info->rc) = MAX(*(cancel_info->rc), error_code); | 
|  | (*(cancel_info->num_active_threads))--; | 
|  | slurm_cond_signal(cancel_info->num_active_threads_cond); | 
|  | slurm_mutex_unlock(cancel_info->num_active_threads_lock); | 
|  |  | 
|  | xfree(cancel_info->job_id_str); | 
|  | xfree(cancel_info); | 
|  | return NULL; | 
|  | } | 
|  |  | 
|  | /* _confirmation - Confirm job cancel request interactively */ | 
|  | static int | 
|  | _confirmation(job_info_t *job_ptr, uint32_t step_id, uint32_t array_id) | 
|  | { | 
|  | char *job_id_str, in_line[128]; | 
|  |  | 
|  | while (1) { | 
|  | job_id_str = _build_jobid_str(job_ptr, array_id); | 
|  | if (step_id == SLURM_BATCH_SCRIPT) { | 
|  | printf("Cancel job_id=%s name=%s partition=%s [y/n]? ", | 
|  | job_id_str, job_ptr->name, | 
|  | job_ptr->partition); | 
|  | } else { | 
|  | printf("Cancel step_id=%s.%u name=%s partition=%s [y/n]? ", | 
|  | job_id_str, step_id, job_ptr->name, | 
|  | job_ptr->partition); | 
|  | } | 
|  | xfree(job_id_str); | 
|  | if (fgets(in_line, sizeof(in_line), stdin) == NULL) | 
|  | continue; | 
|  | if ((in_line[0] == 'y') || (in_line[0] == 'Y')) | 
|  | return 1; | 
|  | if ((in_line[0] == 'n') || (in_line[0] == 'N')) | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | } | 
|  |  | 
|  | static int _signal_job_by_str(void) | 
|  | { | 
|  | job_cancel_info_t *cancel_info; | 
|  | int i, rc = 0; | 
|  |  | 
|  | slurm_mutex_init(&num_active_threads_lock); | 
|  | slurm_cond_init(&num_active_threads_cond, NULL); | 
|  |  | 
|  | for (i = 0; opt.job_list[i]; i++) { | 
|  | cancel_info = (job_cancel_info_t *) | 
|  | xmalloc(sizeof(job_cancel_info_t)); | 
|  | cancel_info->job_id_str = xstrdup(opt.job_list[i]); | 
|  | cancel_info->rc      = &rc; | 
|  | cancel_info->sig     = opt.signal; | 
|  | cancel_info->num_active_threads = &num_active_threads; | 
|  | cancel_info->num_active_threads_lock = | 
|  | &num_active_threads_lock; | 
|  | cancel_info->num_active_threads_cond = | 
|  | &num_active_threads_cond; | 
|  |  | 
|  | slurm_mutex_lock(&num_active_threads_lock); | 
|  | num_active_threads++; | 
|  | while (num_active_threads > MAX_THREADS) { | 
|  | slurm_cond_wait(&num_active_threads_cond, | 
|  | &num_active_threads_lock); | 
|  | } | 
|  | slurm_mutex_unlock(&num_active_threads_lock); | 
|  |  | 
|  | slurm_thread_create_detached(_cancel_job_id, cancel_info); | 
|  | } | 
|  |  | 
|  | /* Wait all spawned threads to finish */ | 
|  | slurm_mutex_lock( &num_active_threads_lock ); | 
|  | while (num_active_threads > 0) { | 
|  | slurm_cond_wait(&num_active_threads_cond, | 
|  | &num_active_threads_lock); | 
|  | } | 
|  | slurm_mutex_unlock(&num_active_threads_lock); | 
|  |  | 
|  | return rc; | 
|  | } |