| <!--#include virtual="header.txt"--> |
| |
| <h1>Job Completion Kafka plugin guide</h1> |
| |
| <p>When configured, the <b>jobcomp/kafka</b> plugin attempts to publish a subset |
| of the fields for each completed job record to a Kafka server. Since Slurm |
| 25.05 it's also possible to optionally send a subset of job record fields when |
| the job first starts running.</p> |
| |
| <h2 id="prereq">Requirements<a class="slurm_link" href="#prereq"></a></h2> |
| <p>The plugin serializes the subset of fields to JSON before each produce |
| attempt. The serialization is done using the Slurm serialization plugins, so the |
| <b>libjson-c</b> development files are an indirect prerequisite for this plugin. |
| </p> |
| |
| <p>The plugin offloads part of the client producer work to <b>librdkafka</b> and |
| consumes its API, thus the library development files are another prerequisite. |
| </p> |
| <ul> |
| <li><a href="https://github.com/confluentinc/librdkafka">librdkafka</a> |
| development files</li> |
| <li><a href="related_software.html#json">libjson-c</a> development |
| files</li> |
| </ul> |
| |
| <h2 id="config">Configuration<a class="slurm_link" href="#config"></a></h2> |
| |
| <p>The plugin is configured with the following |
| <a href="slurm.conf.html">slurm.conf</a> options:</p> |
| |
| <ul> |
| <li> |
| <a href="slurm.conf.html#OPT_JobCompType"><b>JobCompType</b></a> |
| Should be set to <b>jobcomp/kafka</b>. |
| <pre>JobCompType=jobcomp/kafka</pre> |
| </li> |
| |
| <li> |
| <a href="slurm.conf.html#OPT_JobCompLoc"><b>JobCompLoc</b></a> This string |
| represents an absolute path to a file containing 'key=value' pairs configuring |
| <a href="https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md"> |
| librdkafka properties</a>. For the plugin to work properly, this file |
| needs to exist and at least the <b>bootstrap.servers</b> property needs to be |
| be configured. |
| <pre>JobCompLoc=/arbitrary/path/to/rdkafka.conf</pre> |
| |
| <p><b>NOTE</b>: There is no default value for JobCompLoc when this plugin is |
| configured, and thus it needs to be explicitly set.</p> |
| |
| <p><b>NOTE</b>: The <b>librdkafka</b> parameters configured in the file |
| referenced by this option take effect upon slurmctld restart.</p> |
| |
| <p><b>NOTE</b>: The plugin doesn't validate these parameters, but just logs |
| an error and fails if any parameter passed to the library API function |
| rd_kafka_conf_set() fails.</p> |
| |
| <p>An example configuration file could look like this:</p> |
| |
| <pre> |
| bootstrap.servers=kafkahost1:9092 |
| debug=broker,topic,msg |
| linger.ms=400 |
| log_level=7 |
| </pre> |
| </li> |
| |
| <li> |
| <a href="slurm.conf.html#OPT_JobCompParams"><b>JobCompParams</b></a> Comma |
| separated list of extra configurable parameters. Please refer to the slurm.conf |
| man page for specific details. Example: |
| <pre>JobCompParams=flush_timeout=200,poll_interval=3,requeue_on_msg_timeout, |
| topic=mycluster</pre> |
| <p><b>NOTE</b>: Changes to this option do not require a slurmctld restart. |
| Reconfiguration or SIGHUP is sufficient for them to take effect.</p> |
| <p><b>NOTE</b>: Please, refer to the man page to configure job start events.</p> |
| </li> |
| |
| <li> |
| <a href="slurm.conf.html#OPT_DebugFlags"><b>DebugFlags</b></a> Optional |
| <b>JobComp</b> debug flag for extra plugin specific logging. |
| <pre>DebugFlags=JobComp</pre> |
| </li> |
| </ul> |
| |
| <h2 id="functionality">Plugin Functionality<a class="slurm_link" |
| href="#functionality"></a></h2> |
| <p>For each finished (or optionally start running) job, the plugin |
| jobcomp_p_record_job_[end|start] operation is executed. A subset of the job |
| record fields are serialized into a JSON string via the Slurm serialization |
| plugins. Then the serialized string is attempted to be produced using the |
| librdkafka rd_kafka_producev() API call.</p> |
| |
| <p>Producing a message to librdkafka can be done even if the Kafka server is |
| down. But an error returned from this call makes it so the message is discarded. |
| Produced messages accumulate in the librdkafka out queue for up to "linger.ms" |
| milliseconds (a configurable librdkafka parameter) before building |
| a message set from the accumulated messages.</p> |
| |
| <p>Then the librdkafka library transmits a produce request. While no "ACK" is |
| received, messages are conceptually considered to be "in-flight" according to |
| the library documentation. The library then receives a produce response, which |
| can be handled in one of two ways:</p> |
| <dl> |
| <dt><b>retriable error</b></dt> |
| <dd>The library will automatically attempt a retry if |
| no library limit parameter is hit.</dd> |
| <dt><b>permanent error</b> or <b>success</b></dt> |
| <dd>The message will be removed |
| from the library out queue and is staged to the library delivery report queue. |
| </dd></dl> |
| |
| <p>The following diagram illustrates the functionality being described:</p> |
| |
| <img src="jobcomp_kafka_fig1.png"> |
| |
| <p>The <b>jobcomp/kafka</b> plugin has a background poll handler thread that |
| periodically calls the librdkafka API rd_kafka_poll() function. How frequent |
| the thread makes the call is configurable via JobCompParams=poll_interval. |
| The call makes it so that messages in the library delivery report queue are |
| pulled and handled back to the plugin delivery report callback, which takes |
| different actions depending on the error message the library set. By default, |
| successful messages are just logged if DebugFlags=JobComp is enabled, and |
| messages with permanent errors are discarded, unless the error is message timed |
| out and JobCompParams is configured with "requeue_on_msg_timeout", which would |
| instruct the callback to attempt to produce the message again.</p> |
| |
| <p>On plugin termination, the fini() operation is executed. The rd_kafka_purge() |
| library API function is called which purges librdkafka out queue messages. The |
| rd_kafka_flush() API call is also called, which waits until all outstanding |
| produce requests (and potentially other types of requests) are completed. |
| How much to wait is also configurable via JobCompParams=flush_timeout |
| parameter. Purged messages are always saved to the plugin state file in the |
| Slurm StateSaveLocation, and messages purged while "in-flight" are discarded. |
| </p> |
| |
| <p><b>NOTE</b>: You must be using librdkafka v1.0.0 or later in order to take |
| advantage of the purge functionality described above. With previous versions |
| the outgoing queue can not be purged to the state file on shutdown, which |
| means that any messages that weren't delivered before the termination of the |
| kafka plugin will be lost.</p> |
| |
| <p>On plugin initialization, after parsing the configuration, saved messages in |
| the state are loaded and attempted to be produced again. So undelivered messages |
| should be resilient to slurmctld restarts.</p> |
| |
| <p>The Kafka broker "host:port" list should be explicitly configured in the |
| file referenced by JobCompLoc option explained above. The default topic is |
| the configured Slurm ClusterName, but it can also be configured via |
| JobCompParams=topic parameter.</p> |
| |
| <p>The <b>jobcomp/kafka</b> plugin mostly logs informational messages to the |
| JobComp DebugFlag, except for error messages. The librdkafka by default logs |
| to the application stderr, but the plugin configures the library to forcefully |
| log to syslog instead. The library logging level and debug contexts are also |
| configurable via the file referenced by JobCompLoc, as well as the rest of the |
| library configuration parameters.</p> |
| |
| <p style="text-align:center;">Last modified 03 February 2025</p> |
| |
| <!--#include virtual="footer.txt"--> |