| /*****************************************************************************\ |
| * jobcomp_kafka_message.c - Kafka message helper for jobcomp/kafka. |
| ***************************************************************************** |
| * Copyright (C) SchedMD LLC. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #include "src/common/data.h" |
| #include "src/common/list.h" |
| #include "src/common/log.h" |
| #include "src/common/pack.h" |
| #include "src/common/read_config.h" |
| #include "src/common/slurm_protocol_pack.h" |
| #include "src/common/state_save.h" |
| #include "src/common/timers.h" |
| #include "src/common/xassert.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xstring.h" |
| #include "src/interfaces/jobcomp.h" |
| #include "src/plugins/jobcomp/common/jobcomp_common.h" |
| #include "src/plugins/jobcomp/kafka/jobcomp_kafka_conf.h" |
| #include "src/plugins/jobcomp/kafka/jobcomp_kafka_message.h" |
| |
| #define KAFKA_STATE_FILE "jobcomp_kafka_state" |
| |
| |
| static pthread_mutex_t poll_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static pthread_cond_t poll_stop_cond = PTHREAD_COND_INITIALIZER; |
| static pthread_t poll_thread; |
| |
| static list_t *state_msg_list = NULL; |
| static bool terminate = false; |
| /* |
| * The librdkafka API documents it is completely thread-safe. |
| * Not adding mutex locks around rd_kafka_t *rk instance due to this. |
| * _destroy_rd_kafka_handle() is called on plugin termination, so other |
| * operations should not be called by then, and poll thread should |
| * also be already joined. |
| */ |
| static rd_kafka_t *rk = NULL; |
| |
| static void _add_kafka_msg_to_state(kafka_msg_opaque_t *opaque, char *payload); |
| static int _configure_rd_kafka_handle(void); |
| static int _create_rd_kafka_handle(rd_kafka_conf_t *conf); |
| static void _destroy_kafka_msg(void *arg); |
| static void _destroy_rd_kafka_conf(rd_kafka_conf_t **conf); |
| static void _destroy_rd_kafka_handle(void); |
| static void _dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, |
| void *opaque); |
| static void _dump_rd_kafka_conf(rd_kafka_conf_t *conf); |
| static void _flush_rd_kafka_msgs(void); |
| static int _foreach_conf_pair(void *x, void *arg); |
| static kafka_msg_t *_init_kafka_msg(kafka_msg_opaque_t *opaque, char *payload); |
| static void _load_jobcomp_kafka_state(void); |
| static void _pack_jobcomp_kafka_state(buf_t *buffer); |
| static void _pack_jobcomp_kafka_msg_opaque(kafka_msg_opaque_t *opaque, |
| buf_t *buffer); |
| static int _pack_jobcomp_kafka_msg(void *object, void *arg); |
| static void *_poll_handler(void *no_data); |
| static void _purge_rd_kafka_msgs(void); |
| static void _terminate_poll_handler(void); |
| static void _save_jobcomp_kafka_state(void); |
| static rd_kafka_conf_t *_set_rd_kafka_conf(void); |
| static int _unpack_jobcomp_kafka_msg_opaque(kafka_msg_opaque_t *opaque, |
| uint16_t protocol_version, |
| buf_t *buffer); |
| static int _unpack_jobcomp_kafka_msg(uint16_t protocol_version, buf_t *buffer); |
| static void _unpack_jobcomp_kafka_state(buf_t *buffer); |
| |
| /* |
| * Allocate memory for a kafka_msg_t* and initialize it with arguments. |
| * Append to state_msg_list. |
| * |
| * IN: kafka_msg_opaque_t *opaque |
| * IN: char *payload |
| */ |
| static void _add_kafka_msg_to_state(kafka_msg_opaque_t *opaque, char *payload) |
| { |
| kafka_msg_t *kafka_msg = NULL; |
| |
| kafka_msg = _init_kafka_msg(opaque, payload); |
| list_append(state_msg_list, kafka_msg); |
| } |
| |
| /* |
| * Log rd_kafka_conf_t |
| * |
| * IN: rd_kafka_conf_t *conf |
| */ |
| static void _dump_rd_kafka_conf(rd_kafka_conf_t *conf) |
| { |
| const char **array; |
| size_t array_cnt; |
| |
| xassert(conf); |
| |
| /* |
| * Dump the configuration properties and values of conf to an array |
| * with "key", "value" pairs. |
| * |
| * The number of entries in the array is returned in *cntp. |
| * |
| * The dump must be freed with `rd_kafka_conf_dump_free()`. |
| */ |
| array = rd_kafka_conf_dump(conf, &array_cnt); |
| |
| for (int i = 0; ((i + 1) < array_cnt); i += 2) |
| log_flag(JOBCOMP, "%s=%s", array[i], array[i + 1]); |
| |
| rd_kafka_conf_dump_free(array, array_cnt); |
| } |
| |
| /* |
| * 1. Set rd_kafka_conf_t options |
| * 2. Create librdkafka handle with conf object |
| * |
| * RET: SLURM_SUCCESS or SLURM_ERROR |
| */ |
| static int _configure_rd_kafka_handle(void) |
| { |
| int rc = SLURM_SUCCESS; |
| rd_kafka_conf_t *conf = NULL; |
| |
| if (!(conf = _set_rd_kafka_conf())) |
| return SLURM_ERROR; |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_JOBCOMP) |
| _dump_rd_kafka_conf(conf); |
| |
| rc = _create_rd_kafka_handle(conf); |
| |
| return rc; |
| } |
| |
| /* |
| * Message delivery report callback. |
| * |
| * This callback is called exactly once per message, indicating if the message |
| * was successfully delivered: |
| * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) |
| * or permanently failed delivery: |
| * (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR). |
| * |
| * The callback is triggered from rd_kafka_poll() and executes on the |
| * application's thread. |
| */ |
| static void _dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, |
| void *opaque) |
| { |
| bool requeue; |
| kafka_msg_opaque_t *msg_opaque = |
| (kafka_msg_opaque_t *) rkmessage->_private; |
| char *topic = (char *) rd_kafka_topic_name(rkmessage->rkt); |
| char *err_str = (char *) rd_kafka_err2str(rkmessage->err); |
| char *payload = rkmessage->payload; |
| char *event_name = jobcomp_common_get_event_name(msg_opaque->event); |
| |
| switch (rkmessage->err) { |
| case RD_KAFKA_RESP_ERR_NO_ERROR: |
| /* Success */ |
| log_flag(JOBCOMP, |
| "Message event '%s' for JobId=%u delivered to topic '%s'", |
| event_name, msg_opaque->job_id, topic); |
| goto free_msg; |
| case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: |
| /* |
| * The message could not be successfully transmitted before |
| * message.timeout.ms expired. |
| */ |
| slurm_rwlock_rdlock(&kafka_conf_rwlock); |
| requeue = (kafka_conf->flags & |
| KAFKA_CONF_FLAG_REQUEUE_ON_MSG_TIMEOUT); |
| slurm_rwlock_unlock(&kafka_conf_rwlock); |
| |
| if (!requeue) { |
| error("%s: Message event '%s' delivery for JobId=%u failed: %s. Message discarded.", |
| plugin_type, |
| event_name, |
| msg_opaque->job_id, |
| err_str); |
| goto free_msg; |
| } |
| |
| if (!terminate) |
| jobcomp_kafka_message_produce(msg_opaque, payload); |
| else |
| _add_kafka_msg_to_state(msg_opaque, payload); |
| |
| error("%s: Message event '%s' delivery for JobId=%u failed: %s. %s.", |
| plugin_type, event_name, msg_opaque->job_id, err_str, |
| !terminate ? "Attempting to produce message again" : |
| "Saving message to plugin state file."); |
| |
| break; |
| #if RD_KAFKA_VERSION >= 0x010000ff |
| case RD_KAFKA_RESP_ERR__PURGE_QUEUE: |
| /* Purged in-queue. Always requeue in this case. */ |
| log_flag(JOBCOMP, "Message event '%s' delivery for JobId=%u failed: %s. Saving message to plugin state file.", |
| event_name, msg_opaque->job_id, err_str); |
| _add_kafka_msg_to_state(msg_opaque, payload); |
| |
| break; |
| case RD_KAFKA_RESP_ERR__PURGE_INFLIGHT: |
| /* Purged in-flight. */ |
| slurm_rwlock_rdlock(&kafka_conf_rwlock); |
| requeue = (kafka_conf->flags & |
| KAFKA_CONF_FLAG_REQUEUE_PURGE_IN_FLIGHT); |
| slurm_rwlock_unlock(&kafka_conf_rwlock); |
| |
| error("%s: Message event '%s' delivery for JobId=%u failed: %s. %s.", |
| plugin_type, event_name, msg_opaque->job_id, err_str, |
| requeue ? |
| "Saving message to plugin state file" : "Message discarded"); |
| |
| if (!requeue) |
| goto free_msg; |
| |
| _add_kafka_msg_to_state(msg_opaque, payload); |
| |
| break; |
| #endif |
| default: |
| error("%s: Message event '%s' delivery for JobId=%u failed: %s. Message discarded.", |
| plugin_type, event_name, msg_opaque->job_id, err_str); |
| goto free_msg; |
| } |
| |
| /* The rkmessage is destroyed automatically by librdkafka */ |
| return; |
| |
| free_msg: |
| xfree(msg_opaque); |
| xfree(payload); |
| } |
| |
| /* |
| * rd_kafka_conf_list listForF. |
| * |
| * IN: x: config_key_pair_t *conf_pair |
| * IN: arg: rd_kafka_conf_t *conf |
| * RET: -1 if failed to rd_kafka_conf_set() a conf pair, 1 otherwise. |
| */ |
| static int _foreach_conf_pair(void *x, void *arg) |
| { |
| char errstr[512]; |
| config_key_pair_t *conf_pair = x; |
| rd_kafka_conf_t *conf = arg; |
| |
| if (rd_kafka_conf_set(conf, conf_pair->name, conf_pair->value, errstr, |
| sizeof(errstr)) != RD_KAFKA_CONF_OK) { |
| error("%s: rd_kafka_conf_set() failed to set '%s'->'%s': %s", |
| plugin_type, conf_pair->name, conf_pair->value, errstr); |
| return -1; |
| } |
| |
| return 1; |
| } |
| |
| static void _destroy_rd_kafka_conf(rd_kafka_conf_t **conf) |
| { |
| if (conf && *conf) { |
| rd_kafka_conf_destroy(*conf); |
| *conf = NULL; |
| } |
| } |
| |
| /* |
| * 1. Create Kafka configuration handle. |
| * 2. Set configuration properties |
| * |
| * RET: Kafka configuration handle or NULL on error. |
| */ |
| static rd_kafka_conf_t *_set_rd_kafka_conf(void) |
| { |
| rd_kafka_conf_t *conf = NULL; |
| |
| conf = rd_kafka_conf_new(); |
| |
| if (list_for_each(rd_kafka_conf_list, _foreach_conf_pair, conf) < 0) |
| goto fail; |
| |
| /* |
| * The default is to print to stderr, but the rd_kafka_log_syslog() |
| * logger is also available as a builtin alternative. |
| */ |
| rd_kafka_conf_set_log_cb(conf, &rd_kafka_log_syslog); |
| |
| /* |
| * Set the delivery report callback in provided conf object. |
| * |
| * The delivery report callback will be called once for each message |
| * accepted by rd_kafka_produce() (et.al) with err set to indicate |
| * the result of the produce request. |
| * |
| * The callback is called when a message is successfully produced or |
| * if librdkafka encountered a permanent failure. |
| * |
| * An application must call rd_kafka_poll() at regular intervals to |
| * serve queued delivery report callbacks. |
| */ |
| rd_kafka_conf_set_dr_msg_cb(conf, _dr_msg_cb); |
| |
| return conf; |
| fail: |
| _destroy_rd_kafka_conf(&conf); |
| return conf; |
| } |
| |
| /* |
| * Creates a new Kafka handle and starts its operation according to the |
| * specified type (RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER). |
| * |
| * conf is an optional struct created with `rd_kafka_conf_new()` that will be |
| * used instead of the default configuration. |
| * The conf object is freed by this function on success and must not be used |
| * or destroyed by the application subsequently. |
| * |
| * errstr must be a pointer to memory of at least size errstr_size where |
| * `rd_kafka_new()` may write a human readable error message in case the |
| * creation of a new handle fails. In which case the function returns NULL. |
| */ |
| static int _create_rd_kafka_handle(rd_kafka_conf_t *conf) |
| { |
| int rc = SLURM_SUCCESS; |
| char errstr[512]; |
| |
| xassert(conf); |
| |
| if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, |
| sizeof(errstr)))) { |
| error("%s: Failed to create Kafka handle: %s", |
| plugin_type, errstr); |
| _destroy_rd_kafka_conf(&conf); |
| rc = SLURM_ERROR; |
| } |
| |
| return rc; |
| } |
| |
| static void _destroy_rd_kafka_handle(void) |
| { |
| if (rk) { |
| rd_kafka_destroy(rk); |
| rk = NULL; |
| } |
| } |
| |
| /* |
| * Wait until all outstanding produce requests, et.al, are completed. |
| * This should typically be done prior to destroying a producer instance to make |
| * sure all queued and in-flight produce requests are completed before |
| * terminating. |
| * |
| * rd_kafka_flush() is an abstraction over rd_kafka_poll(). |
| * |
| * The timeout specifies the maximum amount of time (in milliseconds) that the |
| * call will block waiting for events. |
| * For non-blocking calls, provide 0 as timeout. |
| * To wait indefinitely for an event, provide -1. |
| */ |
| static void _flush_rd_kafka_msgs(void) |
| { |
| int timeout; |
| |
| if (!rk) |
| return; |
| |
| slurm_rwlock_rdlock(&kafka_conf_rwlock); |
| timeout = kafka_conf->flush_timeout; |
| slurm_rwlock_unlock(&kafka_conf_rwlock); |
| log_flag(JOBCOMP, "Flushing with timeout of %d milliseconds", timeout); |
| if ((rd_kafka_flush(rk, timeout) != RD_KAFKA_RESP_ERR_NO_ERROR) && |
| (rd_kafka_outq_len(rk) > 0)) |
| error("%s: %d messages still in out queue after waiting for %d milliseconds", |
| plugin_type, rd_kafka_outq_len(rk), timeout); |
| } |
| |
| static kafka_msg_t *_init_kafka_msg(kafka_msg_opaque_t *opaque, char *payload) |
| { |
| kafka_msg_t *kafka_msg = NULL; |
| |
| kafka_msg = xmalloc(sizeof(*kafka_msg)); |
| kafka_msg->opaque = opaque; |
| kafka_msg->payload = payload; |
| |
| return kafka_msg; |
| } |
| |
| static void _destroy_kafka_msg(void *arg) |
| { |
| kafka_msg_t *kafka_msg = arg; |
| |
| if (!kafka_msg) |
| return; |
| |
| xfree(kafka_msg->opaque); |
| xfree(kafka_msg->payload); |
| xfree(kafka_msg); |
| } |
| |
| /* Kafka poll thread handler. */ |
| static void *_poll_handler(void *no_data) |
| { |
| struct timespec ts = {0, 0}; |
| |
| while (!terminate) { |
| if (rk) |
| rd_kafka_poll(rk, 0); |
| |
| slurm_rwlock_rdlock(&kafka_conf_rwlock); |
| ts.tv_sec = time(NULL) + kafka_conf->poll_interval; |
| slurm_rwlock_unlock(&kafka_conf_rwlock); |
| |
| slurm_mutex_lock(&poll_mutex); |
| slurm_cond_timedwait(&poll_stop_cond, &poll_mutex, &ts); |
| slurm_mutex_unlock(&poll_mutex); |
| } |
| |
| return NULL; |
| } |
| |
| /* |
| * Purge messages currently handled by the producer instance. |
| * |
| * purge_flags Tells which messages to purge and how. |
| * |
| * The application will need to call rd_kafka_poll() or rd_kafka_flush() |
| * afterwards to serve the delivery report callbacks of the purged messages. |
| * |
| * Messages purged from internal queues fail with the delivery report |
| * error code set to RD_KAFKA_RESP_ERR__PURGE_QUEUE, while purged messages that |
| * are in-flight to or from the broker will fail with the error code set to |
| * RD_KAFKA_RESP_ERR__PURGE_INFLIGHT. |
| * |
| * @warning Purging messages that are in-flight to or from the broker will |
| * ignore any subsequent acknowledgement for these messages received from the |
| * broker, effectively making it impossible for the application to know if the |
| * messages were successfully produced or not. This may result in duplicate |
| * messages if the application retries these messages at a later time. |
| * |
| * @remark This call may block for a short time while background thread queues |
| * are purged. |
| * |
| * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, |
| * RD_KAFKA_RESP_ERR__INVALID_ARG if the \p purge flags are invalid |
| * or unknown, |
| * RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED if called on a non-producer |
| * client instance. |
| */ |
| static void _purge_rd_kafka_msgs(void) |
| { |
| #if RD_KAFKA_VERSION >= 0x010000ff |
| int purge_flags = 0; |
| rd_kafka_resp_err_t err; |
| |
| if (!rk) |
| return; |
| |
| purge_flags = RD_KAFKA_PURGE_F_QUEUE; |
| slurm_rwlock_rdlock(&kafka_conf_rwlock); |
| if (kafka_conf->flags & KAFKA_CONF_FLAG_PURGE_IN_FLIGHT) |
| purge_flags |= RD_KAFKA_PURGE_F_INFLIGHT; |
| if (kafka_conf->flags & KAFKA_CONF_FLAG_PURGE_NON_BLOCKING) |
| purge_flags |= RD_KAFKA_PURGE_F_NON_BLOCKING; |
| slurm_rwlock_unlock(&kafka_conf_rwlock); |
| |
| log_flag(JOBCOMP, "Purging messages with flags=0x%x", purge_flags); |
| if ((err = rd_kafka_purge(rk, purge_flags)) != |
| RD_KAFKA_RESP_ERR_NO_ERROR) |
| error("%s: rd_kafka_purge(0x%x) failed: %s", |
| plugin_type, purge_flags, rd_kafka_err2str(err)); |
| #endif |
| } |
| |
| /* |
| * Pack kafka_msg_opaque_t to a buffer. |
| * |
| * IN kafka_msg_opaque_t pointer. |
| * IN/OUT buf_t pointer - buffer to store packed data, pointers automatically |
| * advanced. |
| */ |
| static void _pack_jobcomp_kafka_msg_opaque(kafka_msg_opaque_t *opaque, |
| buf_t *buffer) |
| { |
| xassert(opaque); |
| |
| pack32(opaque->event, buffer); |
| pack32(opaque->job_id, buffer); |
| } |
| |
| /* |
| * Pack kafka_msg_t to a buffer. |
| * |
| * IN kafka_msg_t pointer. |
| * IN/OUT buf_t pointer - buffer to store packed data, pointers automatically |
| * advanced. |
| */ |
| static int _pack_jobcomp_kafka_msg(void *object, void *arg) |
| { |
| kafka_msg_t *kafka_msg = object; |
| buf_t *buffer = arg; |
| |
| xassert(kafka_msg); |
| xassert(kafka_msg->opaque); |
| xassert(kafka_msg->payload); |
| xassert(buffer); |
| |
| _pack_jobcomp_kafka_msg_opaque(kafka_msg->opaque, buffer); |
| packstr(kafka_msg->payload, buffer); |
| |
| return 0; |
| } |
| |
| static void _pack_jobcomp_kafka_state(buf_t *buffer) |
| { |
| xassert(buffer); |
| xassert(state_msg_list); |
| |
| /* Pack state header. */ |
| pack16(SLURM_PROTOCOL_VERSION, buffer); |
| pack32(list_count(state_msg_list), buffer); |
| |
| /* Pack state body. */ |
| list_for_each_ro(state_msg_list, _pack_jobcomp_kafka_msg, buffer); |
| } |
| |
| /* |
| * Unpack kafka_msg_opaque_t from buffer |
| * |
| * IN/OUT: kafka_msg_opaque_t *opaque |
| * IN: uint16_t protocol_version |
| * IN: buf_t pointer to buffer to unpack from |
| * |
| * RET: SLURM_ERROR if unpack_error or SLURM_SUCCESS |
| */ |
| static int _unpack_jobcomp_kafka_msg_opaque(kafka_msg_opaque_t *opaque, |
| uint16_t protocol_version, |
| buf_t *buffer) |
| { |
| xassert(buffer); |
| xassert(opaque); |
| |
| if (protocol_version >= SLURM_25_05_PROTOCOL_VERSION) { |
| safe_unpack32(&opaque->event, buffer); |
| safe_unpack32(&opaque->job_id, buffer); |
| } else { |
| error("%s: protocol_version %hu not supported", |
| __func__, protocol_version); |
| goto unpack_error; |
| } |
| |
| return SLURM_SUCCESS; |
| |
| unpack_error: |
| /* |
| * Do not handle ignore_state_error here. Will be handled in caller, |
| * otherwise would be redundant. |
| */ |
| |
| return SLURM_ERROR; |
| } |
| |
| /* |
| * Unpack kafka_msg_t from buffer and produce to librdkafka if no unpack error |
| * |
| * IN: uint16_t protocol_version |
| * IN: buf_t pointer to buffer to unpack from |
| * |
| * RET: SLURM_ERROR if unpack_error or SLURM_SUCCESS |
| */ |
| static int _unpack_jobcomp_kafka_msg(uint16_t protocol_version, buf_t *buffer) |
| { |
| uint32_t event = JOBCOMP_EVENT_INVALID; |
| char *payload = NULL; |
| kafka_msg_opaque_t *opaque = NULL; |
| |
| xassert(buffer); |
| |
| if (protocol_version >= SLURM_25_05_PROTOCOL_VERSION) { |
| opaque = jobcomp_kafka_message_init_opaque(event, 0); |
| if (_unpack_jobcomp_kafka_msg_opaque(opaque, protocol_version, |
| buffer) != SLURM_SUCCESS) |
| goto unpack_error; |
| safe_unpackstr(&payload, buffer); |
| } else if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) { |
| uint32_t job_id; |
| |
| /* All pre-25.05 msgs are job_finish events */ |
| event = JOBCOMP_EVENT_JOB_FINISH; |
| safe_unpack32(&job_id, buffer); |
| opaque = jobcomp_kafka_message_init_opaque(event, job_id); |
| safe_unpackstr(&payload, buffer); |
| } else { |
| error("%s: protocol_version %hu not supported", |
| __func__, protocol_version); |
| goto unpack_error; |
| } |
| |
| jobcomp_kafka_message_produce(opaque, payload); |
| |
| return SLURM_SUCCESS; |
| |
| unpack_error: |
| if (!ignore_state_errors) |
| fatal("Incomplete jobcomp/kafka state file, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered."); |
| error("Incomplete jobcomp/kafka state file"); |
| xfree(opaque); |
| xfree(payload); |
| |
| return SLURM_ERROR; |
| } |
| |
| static void _unpack_jobcomp_kafka_state(buf_t *buffer) |
| { |
| uint32_t msg_cnt = 0; |
| uint16_t protocol_version = NO_VAL16; |
| |
| xassert(buffer); |
| |
| /* Unpack state header. */ |
| safe_unpack16(&protocol_version, buffer); |
| safe_unpack32(&msg_cnt, buffer); |
| |
| /* Unpack state body. */ |
| for (int i = 0; i < msg_cnt; i++) |
| if (_unpack_jobcomp_kafka_msg(protocol_version, |
| buffer) != SLURM_SUCCESS) |
| break; |
| |
| FREE_NULL_BUFFER(buffer); |
| return; |
| |
| unpack_error: |
| if (!ignore_state_errors) |
| fatal("Incomplete jobcomp/kafka state file, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered."); |
| error("Incomplete jobcomp/kafka state file"); |
| FREE_NULL_BUFFER(buffer); |
| } |
| |
| static void _load_jobcomp_kafka_state(void) |
| { |
| buf_t *buffer; |
| char *state_file = NULL; |
| |
| if (!(buffer = state_save_open(KAFKA_STATE_FILE, &state_file))) { |
| error("Could not open jobcomp state file %s: %m", state_file); |
| error("NOTE: Finished jobs may be lost!"); |
| xfree(state_file); |
| return; |
| } |
| |
| _unpack_jobcomp_kafka_state(buffer); |
| xfree(state_file); |
| } |
| |
| static void _save_jobcomp_kafka_state(void) |
| { |
| static uint32_t high_buffer_size = BUF_SIZE; |
| buf_t *buffer = NULL; |
| DEF_TIMERS; |
| |
| buffer = init_buf(high_buffer_size); |
| |
| START_TIMER; |
| _pack_jobcomp_kafka_state(buffer); |
| (void) save_buf_to_state(KAFKA_STATE_FILE, buffer, NULL); |
| END_TIMER2("save_jobcomp_kafka_state"); |
| |
| FREE_NULL_BUFFER(buffer); |
| } |
| |
| static void _terminate_poll_handler(void) |
| { |
| slurm_mutex_lock(&poll_mutex); |
| terminate = true; |
| slurm_cond_broadcast(&poll_stop_cond); |
| slurm_mutex_unlock(&poll_mutex); |
| slurm_thread_join(poll_thread); |
| } |
| |
| extern kafka_msg_opaque_t *jobcomp_kafka_message_init_opaque(uint32_t event, |
| uint32_t job_id) |
| { |
| kafka_msg_opaque_t *opaque = xmalloc(sizeof(*opaque)); |
| |
| opaque->event = event; |
| opaque->job_id = job_id; |
| |
| return opaque; |
| } |
| |
| extern int jobcomp_kafka_message_init(void) |
| { |
| if (_configure_rd_kafka_handle() != SLURM_SUCCESS) |
| return SLURM_ERROR; |
| |
| state_msg_list = list_create(_destroy_kafka_msg); |
| _load_jobcomp_kafka_state(); |
| slurm_thread_create(&poll_thread, _poll_handler, NULL); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| extern void jobcomp_kafka_message_fini(void) |
| { |
| _terminate_poll_handler(); |
| _purge_rd_kafka_msgs(); |
| _flush_rd_kafka_msgs(); |
| _destroy_rd_kafka_handle(); |
| _save_jobcomp_kafka_state(); |
| FREE_NULL_LIST(state_msg_list); |
| } |
| |
| /* |
| * Attempt to produce a message in an asynchronous non-blocking way. |
| * |
| * IN: kafka_msg_opaque_t *opaque |
| * IN: char *payload |
| */ |
| extern void jobcomp_kafka_message_produce(kafka_msg_opaque_t *opaque, |
| char *payload) |
| { |
| char *topic = NULL; |
| size_t len; |
| rd_kafka_resp_err_t err; |
| char *event_name; |
| |
| xassert(rk); |
| xassert(opaque); |
| |
| event_name = jobcomp_common_get_event_name(opaque->event); |
| len = strlen(payload); |
| |
| slurm_rwlock_rdlock(&kafka_conf_rwlock); |
| if (!(topic = jobcomp_kafka_conf_get_event_topic(opaque->event))) { |
| error("%s: Failed to produce JobId=%u message: event '%s' disabled. Message discarded.", |
| plugin_type, opaque->job_id, event_name); |
| xfree(opaque); |
| xfree(payload); |
| slurm_rwlock_unlock(&kafka_conf_rwlock); |
| return; |
| } |
| |
| /* |
| * Arguments to rd_kafka_producev(): |
| * |
| * 0. Producer handle. |
| * 1. Topic name. librdkafka makes a copy, so after call it can be |
| * freed. |
| * 2. Flag to tell librdkafka to make copy of payload, so after call it |
| * can be freed. |
| * 3. Message value (payload) and payload length. |
| * 4. Per-message opaque (see _dr_msg_cb rd_kafka_message_t->_private). |
| * Make a manual copy since librdkafka doesn't copy it. |
| * xfree() _private at the end of callback. |
| * 5. End sentinel |
| */ |
| err = rd_kafka_producev(rk, |
| RD_KAFKA_V_TOPIC(topic), |
| RD_KAFKA_V_VALUE(payload, len), |
| RD_KAFKA_V_OPAQUE(opaque), |
| RD_KAFKA_V_END); |
| |
| if (err == RD_KAFKA_RESP_ERR_NO_ERROR) { |
| log_flag(JOBCOMP, "Produced JobId=%u event '%s' message for topic '%s' to librdkafka queue.", |
| opaque->job_id, event_name, topic); |
| /* Do not xfree(opaque). Delivery msg callback will do it. */ |
| } else { |
| error("%s: Failed to produce JobId=%u event '%s' message for topic '%s': %s. Message discarded.", |
| plugin_type, |
| opaque->job_id, |
| event_name, |
| topic, |
| rd_kafka_err2str(err)); |
| xfree(opaque); |
| xfree(payload); |
| } |
| slurm_rwlock_unlock(&kafka_conf_rwlock); |
| } |