| /* |
| * Copyright 2004-2019 the Pacemaker project contributors |
| * |
| * The version control history for this file may have further details. |
| * |
| * This source code is licensed under the GNU Lesser General Public License |
| * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. |
| */ |
| |
| #include <crm_internal.h> |
| #include <bzlib.h> |
| #include <sys/socket.h> |
| #include <netinet/in.h> |
| #include <arpa/inet.h> |
| #include <netdb.h> |
| |
| #include <crm/common/ipc.h> |
| #include <crm/cluster/internal.h> |
| #include <crm/common/mainloop.h> |
| #include <sys/utsname.h> |
| |
| #include <qb/qbipcc.h> |
| #include <qb/qbutil.h> |
| |
| #include <corosync/corodefs.h> |
| #include <corosync/corotypes.h> |
| #include <corosync/hdb.h> |
| #include <corosync/cpg.h> |
| |
| #include <crm/msg_xml.h> |
| |
| #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */ |
| |
| cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */ |
| |
| static bool cpg_evicted = FALSE; |
| gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL; |
| |
| #define cs_repeat(counter, max, code) do { \ |
| code; \ |
| if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ |
| counter++; \ |
| crm_debug("Retrying operation after %ds", counter); \ |
| sleep(counter); \ |
| } else { \ |
| break; \ |
| } \ |
| } while(counter < max) |
| |
| void |
| cluster_disconnect_cpg(crm_cluster_t *cluster) |
| { |
| pcmk_cpg_handle = 0; |
| if (cluster->cpg_handle) { |
| crm_trace("Disconnecting CPG"); |
| cpg_leave(cluster->cpg_handle, &cluster->group); |
| cpg_finalize(cluster->cpg_handle); |
| cluster->cpg_handle = 0; |
| |
| } else { |
| crm_info("No CPG connection"); |
| } |
| } |
| |
| uint32_t get_local_nodeid(cpg_handle_t handle) |
| { |
| cs_error_t rc = CS_OK; |
| int retries = 0; |
| static uint32_t local_nodeid = 0; |
| cpg_handle_t local_handle = handle; |
| cpg_callbacks_t cb = { }; |
| int fd = -1; |
| uid_t found_uid = 0; |
| gid_t found_gid = 0; |
| pid_t found_pid = 0; |
| int rv; |
| |
| if(local_nodeid != 0) { |
| return local_nodeid; |
| } |
| |
| #if 0 |
| /* Should not be necessary */ |
| if(get_cluster_type() == pcmk_cluster_classic_ais) { |
| get_ais_details(&local_nodeid, NULL); |
| goto done; |
| } |
| #endif |
| |
| if(handle == 0) { |
| crm_trace("Creating connection"); |
| cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb)); |
| if (rc != CS_OK) { |
| crm_err("Could not connect to the CPG API (rc=%d)", rc); |
| return 0; |
| } |
| |
| rc = cpg_fd_get(local_handle, &fd); |
| if (rc != CS_OK) { |
| crm_err("Could not obtain the CPG API connection (rc=%d)", rc); |
| goto bail; |
| } |
| |
| /* CPG provider run as root (in given user namespace, anyway)? */ |
| if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, |
| &found_uid, &found_gid))) { |
| crm_err("CPG provider is not authentic:" |
| " process %lld (uid: %lld, gid: %lld)", |
| (long long) PCMK__SPECIAL_PID_AS_0(found_pid), |
| (long long) found_uid, (long long) found_gid); |
| goto bail; |
| } else if (rv < 0) { |
| crm_err("Could not verify authenticity of CPG provider: %s (%d)", |
| strerror(-rv), -rv); |
| goto bail; |
| } |
| } |
| |
| if (rc == CS_OK) { |
| retries = 0; |
| crm_trace("Performing lookup"); |
| cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid)); |
| } |
| |
| if (rc != CS_OK) { |
| crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc); |
| } |
| |
| bail: |
| if(handle == 0) { |
| crm_trace("Closing connection"); |
| cpg_finalize(local_handle); |
| } |
| crm_debug("Local nodeid is %u", local_nodeid); |
| return local_nodeid; |
| } |
| |
| |
| GListPtr cs_message_queue = NULL; |
| int cs_message_timer = 0; |
| |
| static ssize_t crm_cs_flush(gpointer data); |
| |
| static gboolean |
| crm_cs_flush_cb(gpointer data) |
| { |
| cs_message_timer = 0; |
| crm_cs_flush(data); |
| return FALSE; |
| } |
| |
| #define CS_SEND_MAX 200 |
| static ssize_t |
| crm_cs_flush(gpointer data) |
| { |
| int sent = 0; |
| ssize_t rc = 0; |
| int queue_len = 0; |
| static unsigned int last_sent = 0; |
| cpg_handle_t *handle = (cpg_handle_t *)data; |
| |
| if (*handle == 0) { |
| crm_trace("Connection is dead"); |
| return pcmk_ok; |
| } |
| |
| queue_len = g_list_length(cs_message_queue); |
| if ((queue_len % 1000) == 0 && queue_len > 1) { |
| crm_err("CPG queue has grown to %d", queue_len); |
| |
| } else if (queue_len == CS_SEND_MAX) { |
| crm_warn("CPG queue has grown to %d", queue_len); |
| } |
| |
| if (cs_message_timer) { |
| /* There is already a timer, wait until it goes off */ |
| crm_trace("Timer active %d", cs_message_timer); |
| return pcmk_ok; |
| } |
| |
| while (cs_message_queue && sent < CS_SEND_MAX) { |
| struct iovec *iov = cs_message_queue->data; |
| |
| errno = 0; |
| rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1); |
| |
| if (rc != CS_OK) { |
| break; |
| } |
| |
| sent++; |
| last_sent++; |
| crm_trace("CPG message sent, size=%llu", |
| (unsigned long long) iov->iov_len); |
| |
| cs_message_queue = g_list_remove(cs_message_queue, iov); |
| free(iov->iov_base); |
| free(iov); |
| } |
| |
| queue_len -= sent; |
| if (sent > 1 || cs_message_queue) { |
| crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)", |
| sent, queue_len, last_sent, ais_error2text(rc), |
| (long long) rc); |
| } else { |
| crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)", |
| sent, queue_len, last_sent, ais_error2text(rc), |
| (long long) rc); |
| } |
| |
| if (cs_message_queue) { |
| uint32_t delay_ms = 100; |
| if(rc != CS_OK) { |
| /* Proportionally more if sending failed but cap at 1s */ |
| delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len)); |
| } |
| cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data); |
| } |
| |
| return rc; |
| } |
| |
| gboolean |
| send_cpg_iov(struct iovec * iov) |
| { |
| static unsigned int queued = 0; |
| |
| queued++; |
| crm_trace("Queueing CPG message %u (%llu bytes)", |
| queued, (unsigned long long) iov->iov_len); |
| cs_message_queue = g_list_append(cs_message_queue, iov); |
| crm_cs_flush(&pcmk_cpg_handle); |
| return TRUE; |
| } |
| |
| static int |
| pcmk_cpg_dispatch(gpointer user_data) |
| { |
| int rc = 0; |
| crm_cluster_t *cluster = (crm_cluster_t*) user_data; |
| |
| rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE); |
| if (rc != CS_OK) { |
| crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc); |
| cluster->cpg_handle = 0; |
| return -1; |
| |
| } else if(cpg_evicted) { |
| crm_err("Evicted from CPG membership"); |
| return -1; |
| } |
| return 0; |
| } |
| |
| char * |
| pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, |
| uint32_t *kind, const char **from) |
| { |
| char *data = NULL; |
| AIS_Message *msg = (AIS_Message *) content; |
| |
| if(handle) { |
| /* 'msg' came from CPG not the plugin |
| * Do filtering and field massaging |
| */ |
| uint32_t local_nodeid = get_local_nodeid(handle); |
| const char *local_name = get_local_node_name(); |
| |
| if (msg->sender.id > 0 && msg->sender.id != nodeid) { |
| crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id); |
| return NULL; |
| |
| } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) { |
| /* Not for us */ |
| crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid); |
| return NULL; |
| } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) { |
| /* Not for us */ |
| crm_trace("Not for us: %s != %s", msg->host.uname, local_name); |
| return NULL; |
| } |
| |
| msg->sender.id = nodeid; |
| if (msg->sender.size == 0) { |
| crm_node_t *peer = crm_get_peer(nodeid, NULL); |
| |
| if (peer == NULL) { |
| crm_err("Peer with nodeid=%u is unknown", nodeid); |
| |
| } else if (peer->uname == NULL) { |
| crm_err("No uname for peer with nodeid=%u", nodeid); |
| |
| } else { |
| crm_notice("Fixing uname for peer with nodeid=%u", nodeid); |
| msg->sender.size = strlen(peer->uname); |
| memset(msg->sender.uname, 0, MAX_NAME); |
| memcpy(msg->sender.uname, peer->uname, msg->sender.size); |
| } |
| } |
| } |
| |
| crm_trace("Got new%s message (size=%d, %d, %d)", |
| msg->is_compressed ? " compressed" : "", |
| ais_data_len(msg), msg->size, msg->compressed_size); |
| |
| if (kind != NULL) { |
| *kind = msg->header.id; |
| } |
| if (from != NULL) { |
| *from = msg->sender.uname; |
| } |
| |
| if (msg->is_compressed && msg->size > 0) { |
| int rc = BZ_OK; |
| char *uncompressed = NULL; |
| unsigned int new_size = msg->size + 1; |
| |
| if (check_message_sanity(msg, NULL) == FALSE) { |
| goto badmsg; |
| } |
| |
| crm_trace("Decompressing message data"); |
| uncompressed = calloc(1, new_size); |
| rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0); |
| |
| if (rc != BZ_OK) { |
| crm_err("Decompression failed: %d", rc); |
| free(uncompressed); |
| goto badmsg; |
| } |
| |
| CRM_ASSERT(rc == BZ_OK); |
| CRM_ASSERT(new_size == msg->size); |
| |
| data = uncompressed; |
| |
| } else if (check_message_sanity(msg, data) == FALSE) { |
| goto badmsg; |
| |
| } else if (safe_str_eq("identify", data)) { |
| char *pid_s = crm_getpid_s(); |
| |
| send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); |
| free(pid_s); |
| return NULL; |
| |
| } else { |
| data = strdup(msg->data); |
| } |
| |
| if (msg->header.id != crm_class_members) { |
| /* Is this even needed anymore? */ |
| crm_get_peer(msg->sender.id, msg->sender.uname); |
| } |
| |
| if (msg->header.id == crm_class_rmpeer) { |
| uint32_t id = crm_int_helper(data, NULL); |
| |
| crm_info("Removing peer %s/%u", data, id); |
| reap_crm_member(id, NULL); |
| free(data); |
| return NULL; |
| |
| #if SUPPORT_PLUGIN |
| } else if (is_classic_ais_cluster()) { |
| plugin_handle_membership(msg); |
| #endif |
| } |
| |
| crm_trace("Payload: %.200s", data); |
| return data; |
| |
| badmsg: |
| crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" |
| " min=%d, total=%d, size=%d, bz2_size=%d", |
| msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), |
| ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), |
| msg->sender.pid, (int)sizeof(AIS_Message), |
| msg->header.size, msg->size, msg->compressed_size); |
| |
| free(data); |
| return NULL; |
| } |
| |
| static int cmp_member_list_nodeid(const void *first, |
| const void *second) |
| { |
| const struct cpg_address *const a = *((const struct cpg_address **) first), |
| *const b = *((const struct cpg_address **) second); |
| if (a->nodeid < b->nodeid) { |
| return -1; |
| } else if (a->nodeid > b->nodeid) { |
| return 1; |
| } |
| /* don't bother with "reason" nor "pid" */ |
| return 0; |
| } |
| |
| static const char * |
| cpgreason2str(cpg_reason_t reason) |
| { |
| switch (reason) { |
| case CPG_REASON_JOIN: return " via cpg_join"; |
| case CPG_REASON_LEAVE: return " via cpg_leave"; |
| case CPG_REASON_NODEDOWN: return " via cluster exit"; |
| case CPG_REASON_NODEUP: return " via cluster join"; |
| case CPG_REASON_PROCDOWN: return " for unknown reason"; |
| default: break; |
| } |
| return ""; |
| } |
| |
| static inline const char * |
| peer_name(crm_node_t *peer) |
| { |
| if (peer == NULL) { |
| return "unknown node"; |
| } else if (peer->uname == NULL) { |
| return "peer node"; |
| } else { |
| return peer->uname; |
| } |
| } |
| |
| void |
| pcmk_cpg_membership(cpg_handle_t handle, |
| const struct cpg_name *groupName, |
| const struct cpg_address *member_list, size_t member_list_entries, |
| const struct cpg_address *left_list, size_t left_list_entries, |
| const struct cpg_address *joined_list, size_t joined_list_entries) |
| { |
| int i; |
| gboolean found = FALSE; |
| static int counter = 0; |
| uint32_t local_nodeid = get_local_nodeid(handle); |
| const struct cpg_address *key, **sorted; |
| |
| sorted = malloc(member_list_entries * sizeof(const struct cpg_address *)); |
| CRM_ASSERT(sorted != NULL); |
| |
| for (size_t iter = 0; iter < member_list_entries; iter++) { |
| sorted[iter] = member_list + iter; |
| } |
| /* so that the cross-matching multiply-subscribed nodes is then cheap */ |
| qsort(sorted, member_list_entries, sizeof(const struct cpg_address *), |
| cmp_member_list_nodeid); |
| |
| for (i = 0; i < left_list_entries; i++) { |
| crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL); |
| const struct cpg_address **rival = NULL; |
| |
| /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation |
| and not playing by this rule may go wild in case of multiple |
| residual instances of the same pacemaker daemon at the same node |
| -- we must ensure that the possible local rival(s) won't make us |
| cry out and bail (e.g. when they quit themselves), since all the |
| surrounding logic denies this simple fact that the full membership |
| is discriminated also per the PID of the process beside mere node |
| ID (and implicitly, group ID); practically, this will be sound in |
| terms of not preventing progress, since all the CPG joiners are |
| also API end-point carriers, and that's what matters locally |
| (who's the winner); |
| remotely, we will just compare leave_list and member_list and if |
| the left process has its node retained in member_list (under some |
| other PID, anyway) we will just ignore it as well |
| XXX: long-term fix is to establish in-out PID-aware tracking? */ |
| if (peer) { |
| key = &left_list[i]; |
| rival = bsearch(&key, sorted, member_list_entries, |
| sizeof(const struct cpg_address *), |
| cmp_member_list_nodeid); |
| } |
| |
| if (rival == NULL) { |
| crm_info("Group %s event %d: %s (node %u pid %u) left%s", |
| groupName->value, counter, peer_name(peer), |
| left_list[i].nodeid, left_list[i].pid, |
| cpgreason2str(left_list[i].reason)); |
| if (peer) { |
| crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, |
| OFFLINESTATUS); |
| } |
| } else if (left_list[i].nodeid == local_nodeid) { |
| crm_warn("Group %s event %d: duplicate local pid %u left%s", |
| groupName->value, counter, |
| left_list[i].pid, cpgreason2str(left_list[i].reason)); |
| } else { |
| crm_warn("Group %s event %d: " |
| "%s (node %u) duplicate pid %u left%s (%u remains)", |
| groupName->value, counter, peer_name(peer), |
| left_list[i].nodeid, left_list[i].pid, |
| cpgreason2str(left_list[i].reason), (*rival)->pid); |
| } |
| } |
| free(sorted); |
| sorted = NULL; |
| |
| for (i = 0; i < joined_list_entries; i++) { |
| crm_info("Group %s event %d: node %u pid %u joined%s", |
| groupName->value, counter, joined_list[i].nodeid, |
| joined_list[i].pid, cpgreason2str(joined_list[i].reason)); |
| } |
| |
| for (i = 0; i < member_list_entries; i++) { |
| crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL); |
| |
| if (member_list[i].nodeid == local_nodeid |
| && member_list[i].pid != getpid()) { |
| /* see the note above */ |
| crm_warn("Group %s event %d: detected duplicate local pid %u", |
| groupName->value, counter, member_list[i].pid); |
| continue; |
| } |
| crm_info("Group %s event %d: %s (node %u pid %u) is member", |
| groupName->value, counter, peer_name(peer), |
| member_list[i].nodeid, member_list[i].pid); |
| |
| /* Anyone that is sending us CPG messages must also be a _CPG_ member. |
| * But it's _not_ safe to assume it's in the quorum membership. |
| * We may have just found out it's dead and are processing the last couple of messages it sent |
| */ |
| peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); |
| if(peer && peer->state && crm_is_peer_active(peer) == FALSE) { |
| time_t now = time(NULL); |
| |
| /* Co-opt the otherwise unused votes field */ |
| if(peer->votes == 0) { |
| peer->votes = now; |
| |
| } else if(now > (60 + peer->votes)) { |
| /* On the otherhand, if we're still getting messages, at a certain point |
| * we need to acknowledge our internal cache is probably wrong |
| * |
| * Set the threshold to 1 minute |
| */ |
| crm_warn("Node %u is member of group %s but was believed offline", |
| member_list[i].nodeid, groupName->value); |
| if (crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0)) { |
| peer->votes = 0; |
| } |
| } |
| } |
| |
| if (local_nodeid == member_list[i].nodeid) { |
| found = TRUE; |
| } |
| } |
| |
| if (!found) { |
| crm_err("Local node was evicted from group %s", groupName->value); |
| cpg_evicted = TRUE; |
| } |
| |
| counter++; |
| } |
| |
| gboolean |
| cluster_connect_cpg(crm_cluster_t *cluster) |
| { |
| cs_error_t rc; |
| int fd = -1; |
| int retries = 0; |
| uint32_t id = 0; |
| crm_node_t *peer = NULL; |
| cpg_handle_t handle = 0; |
| uid_t found_uid = 0; |
| gid_t found_gid = 0; |
| pid_t found_pid = 0; |
| int rv; |
| |
| struct mainloop_fd_callbacks cpg_fd_callbacks = { |
| .dispatch = pcmk_cpg_dispatch, |
| .destroy = cluster->destroy, |
| }; |
| |
| cpg_callbacks_t cpg_callbacks = { |
| .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn, |
| .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn, |
| /* .cpg_deliver_fn = pcmk_cpg_deliver, */ |
| /* .cpg_confchg_fn = pcmk_cpg_membership, */ |
| }; |
| |
| cpg_evicted = FALSE; |
| cluster->group.length = 0; |
| cluster->group.value[0] = 0; |
| |
| /* group.value is char[128] */ |
| strncpy(cluster->group.value, crm_system_name?crm_system_name:"unknown", 127); |
| cluster->group.value[127] = 0; |
| cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value)); |
| |
| cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks)); |
| if (rc != CS_OK) { |
| crm_err("Could not connect to the CPG API (rc=%d)", rc); |
| goto bail; |
| } |
| |
| rc = cpg_fd_get(handle, &fd); |
| if (rc != CS_OK) { |
| crm_err("Could not obtain the CPG API connection (rc=%d)", rc); |
| goto bail; |
| } |
| |
| /* CPG provider run as root (in given user namespace, anyway)? */ |
| if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, |
| &found_uid, &found_gid))) { |
| crm_err("CPG provider is not authentic:" |
| " process %lld (uid: %lld, gid: %lld)", |
| (long long) PCMK__SPECIAL_PID_AS_0(found_pid), |
| (long long) found_uid, (long long) found_gid); |
| rc = CS_ERR_ACCESS; |
| goto bail; |
| } else if (rv < 0) { |
| crm_err("Could not verify authenticity of CPG provider: %s (%d)", |
| strerror(-rv), -rv); |
| rc = CS_ERR_ACCESS; |
| goto bail; |
| } |
| |
| id = get_local_nodeid(handle); |
| if (id == 0) { |
| crm_err("Could not get local node id from the CPG API"); |
| goto bail; |
| |
| } |
| cluster->nodeid = id; |
| |
| retries = 0; |
| cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group)); |
| if (rc != CS_OK) { |
| crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); |
| goto bail; |
| } |
| |
| pcmk_cpg_handle = handle; |
| cluster->cpg_handle = handle; |
| mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks); |
| |
| bail: |
| if (rc != CS_OK) { |
| cpg_finalize(handle); |
| return FALSE; |
| } |
| |
| peer = crm_get_peer(id, NULL); |
| crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); |
| return TRUE; |
| } |
| |
| gboolean |
| send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest) |
| { |
| gboolean rc = TRUE; |
| char *data = NULL; |
| |
| data = dump_xml_unformatted(msg); |
| rc = send_cluster_text(crm_class_cluster, data, local, node, dest); |
| free(data); |
| return rc; |
| } |
| |
| gboolean |
| send_cluster_text(int class, const char *data, |
| gboolean local, crm_node_t * node, enum crm_ais_msg_types dest) |
| { |
| static int msg_id = 0; |
| static int local_pid = 0; |
| static int local_name_len = 0; |
| static const char *local_name = NULL; |
| |
| char *target = NULL; |
| struct iovec *iov; |
| AIS_Message *msg = NULL; |
| enum crm_ais_msg_types sender = text2msg_type(crm_system_name); |
| |
| /* There are only 6 handlers registered to crm_lib_service in plugin.c */ |
| CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); |
| return FALSE); |
| |
| #if !SUPPORT_PLUGIN |
| CRM_CHECK(dest != crm_msg_ais, return FALSE); |
| #endif |
| |
| if(local_name == NULL) { |
| local_name = get_local_node_name(); |
| } |
| if(local_name_len == 0 && local_name) { |
| local_name_len = strlen(local_name); |
| } |
| |
| if (data == NULL) { |
| data = ""; |
| } |
| |
| if (local_pid == 0) { |
| local_pid = getpid(); |
| } |
| |
| if (sender == crm_msg_none) { |
| sender = local_pid; |
| } |
| |
| msg = calloc(1, sizeof(AIS_Message)); |
| |
| msg_id++; |
| msg->id = msg_id; |
| msg->header.id = class; |
| msg->header.error = CS_OK; |
| |
| msg->host.type = dest; |
| msg->host.local = local; |
| |
| if (node) { |
| if (node->uname) { |
| target = strdup(node->uname); |
| msg->host.size = strlen(node->uname); |
| memset(msg->host.uname, 0, MAX_NAME); |
| memcpy(msg->host.uname, node->uname, msg->host.size); |
| } else { |
| target = crm_strdup_printf("%u", node->id); |
| } |
| msg->host.id = node->id; |
| } else { |
| target = strdup("all"); |
| } |
| |
| msg->sender.id = 0; |
| msg->sender.type = sender; |
| msg->sender.pid = local_pid; |
| msg->sender.size = local_name_len; |
| memset(msg->sender.uname, 0, MAX_NAME); |
| if(local_name && msg->sender.size) { |
| memcpy(msg->sender.uname, local_name, msg->sender.size); |
| } |
| |
| msg->size = 1 + strlen(data); |
| msg->header.size = sizeof(AIS_Message) + msg->size; |
| |
| if (msg->size < CRM_BZ2_THRESHOLD) { |
| msg = realloc_safe(msg, msg->header.size); |
| memcpy(msg->data, data, msg->size); |
| |
| } else { |
| char *compressed = NULL; |
| unsigned int new_size = 0; |
| char *uncompressed = strdup(data); |
| |
| if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) { |
| |
| msg->header.size = sizeof(AIS_Message) + new_size; |
| msg = realloc_safe(msg, msg->header.size); |
| memcpy(msg->data, compressed, new_size); |
| |
| msg->is_compressed = TRUE; |
| msg->compressed_size = new_size; |
| |
| } else { |
| msg = realloc_safe(msg, msg->header.size); |
| memcpy(msg->data, data, msg->size); |
| } |
| |
| free(uncompressed); |
| free(compressed); |
| } |
| |
| iov = calloc(1, sizeof(struct iovec)); |
| iov->iov_base = msg; |
| iov->iov_len = msg->header.size; |
| |
| if (msg->compressed_size) { |
| crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s", |
| msg->id, target, (unsigned long long) iov->iov_len, |
| msg->compressed_size, data); |
| } else { |
| crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s", |
| msg->id, target, (unsigned long long) iov->iov_len, |
| msg->size, data); |
| } |
| free(target); |
| |
| #if SUPPORT_PLUGIN |
| /* The plugin is the only time we don't use CPG messaging */ |
| if(get_cluster_type() == pcmk_cluster_classic_ais) { |
| return send_plugin_text(class, iov); |
| } |
| #endif |
| |
| send_cpg_iov(iov); |
| |
| return TRUE; |
| } |
| |
| enum crm_ais_msg_types |
| text2msg_type(const char *text) |
| { |
| int type = crm_msg_none; |
| |
| CRM_CHECK(text != NULL, return type); |
| if (safe_str_eq(text, "ais")) { |
| type = crm_msg_ais; |
| } else if (safe_str_eq(text, "crm_plugin")) { |
| type = crm_msg_ais; |
| } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) { |
| type = crm_msg_cib; |
| } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) { |
| type = crm_msg_crmd; |
| } else if (safe_str_eq(text, CRM_SYSTEM_DC)) { |
| type = crm_msg_crmd; |
| } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) { |
| type = crm_msg_te; |
| } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) { |
| type = crm_msg_pe; |
| } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) { |
| type = crm_msg_lrmd; |
| } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) { |
| type = crm_msg_stonithd; |
| } else if (safe_str_eq(text, "stonith-ng")) { |
| type = crm_msg_stonith_ng; |
| } else if (safe_str_eq(text, "attrd")) { |
| type = crm_msg_attrd; |
| |
| } else { |
| /* This will normally be a transient client rather than |
| * a cluster daemon. Set the type to the pid of the client |
| */ |
| int scan_rc = sscanf(text, "%d", &type); |
| |
| if (scan_rc != 1 || type <= crm_msg_stonith_ng) { |
| /* Ensure it's sane */ |
| type = crm_msg_none; |
| } |
| } |
| return type; |
| } |