pacemaker 2.1.8-2.1.8~rc1
Scalable High-Availability cluster resource manager
Loading...
Searching...
No Matches
remote.c
Go to the documentation of this file.
1/*
2 * Copyright 2008-2024 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10#include <crm_internal.h>
11#include <crm/crm.h>
12
13#include <sys/param.h>
14#include <stdio.h>
15#include <sys/types.h>
16#include <sys/stat.h>
17#include <unistd.h>
18#include <sys/socket.h>
19#include <arpa/inet.h>
20#include <netinet/in.h>
21#include <netinet/ip.h>
22#include <netinet/tcp.h>
23#include <netdb.h>
24#include <stdlib.h>
25#include <errno.h>
26#include <inttypes.h> // PRIx32
27
28#include <glib.h>
29#include <bzlib.h>
30
32#include <crm/common/xml.h>
33#include <crm/common/mainloop.h>
35
36#ifdef HAVE_GNUTLS_GNUTLS_H
37# include <gnutls/gnutls.h>
38#endif
39
40/* Swab macros from linux/swab.h */
41#ifdef HAVE_LINUX_SWAB_H
42# include <linux/swab.h>
43#else
44/*
45 * casts are necessary for constants, because we never know how for sure
46 * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
47 */
48#define __swab16(x) ((uint16_t)( \
49 (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
50 (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
51
52#define __swab32(x) ((uint32_t)( \
53 (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
54 (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
55 (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
56 (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
57
58#define __swab64(x) ((uint64_t)( \
59 (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
60 (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
61 (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
62 (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
63 (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
64 (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
65 (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
66 (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
67#endif
68
69#define REMOTE_MSG_VERSION 1
70#define ENDIAN_LOCAL 0xBADADBBD
71
72struct remote_header_v0 {
73 uint32_t endian; /* Detect messages from hosts with different endian-ness */
74 uint32_t version;
75 uint64_t id;
76 uint64_t flags;
77 uint32_t size_total;
78 uint32_t payload_offset;
79 uint32_t payload_compressed;
80 uint32_t payload_uncompressed;
81
82 /* New fields get added here */
83
84} __attribute__ ((packed));
85
97static struct remote_header_v0 *
98localized_remote_header(pcmk__remote_t *remote)
99{
100 struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
101 if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
102 return NULL;
103
104 } else if(header->endian != ENDIAN_LOCAL) {
105 uint32_t endian = __swab32(header->endian);
106
108 if(endian != ENDIAN_LOCAL) {
109 crm_err("Invalid message detected, endian mismatch: %" PRIx32
110 " is neither %" PRIx32 " nor the swab'd %" PRIx32,
111 ENDIAN_LOCAL, header->endian, endian);
112 return NULL;
113 }
114
115 header->id = __swab64(header->id);
116 header->flags = __swab64(header->flags);
117 header->endian = __swab32(header->endian);
118
119 header->version = __swab32(header->version);
120 header->size_total = __swab32(header->size_total);
121 header->payload_offset = __swab32(header->payload_offset);
122 header->payload_compressed = __swab32(header->payload_compressed);
123 header->payload_uncompressed = __swab32(header->payload_uncompressed);
124 }
125
126 return header;
127}
128
129#ifdef HAVE_GNUTLS_GNUTLS_H
130
131int
132pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_ms)
133{
134 int rc = 0;
135 int pollrc = 0;
136 time_t time_limit = time(NULL) + timeout_ms / 1000;
137
138 do {
139 rc = gnutls_handshake(*remote->tls_session);
140 if ((rc == GNUTLS_E_INTERRUPTED) || (rc == GNUTLS_E_AGAIN)) {
141 pollrc = pcmk__remote_ready(remote, 1000);
142 if ((pollrc != pcmk_rc_ok) && (pollrc != ETIME)) {
143 /* poll returned error, there is no hope */
144 crm_trace("TLS handshake poll failed: %s (%d)",
145 pcmk_strerror(pollrc), pollrc);
146 return pcmk_legacy2rc(pollrc);
147 }
148 } else if (rc < 0) {
149 crm_trace("TLS handshake failed: %s (%d)",
150 gnutls_strerror(rc), rc);
151 return EPROTO;
152 } else {
153 return pcmk_rc_ok;
154 }
155 } while (time(NULL) < time_limit);
156 return ETIME;
157}
158
165static void
166set_minimum_dh_bits(const gnutls_session_t *session)
167{
168 int dh_min_bits;
169
171 0);
172
173 /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
174 * the priority string imply the DH requirements, but this is the only
175 * way to give the user control over compatibility with older servers.
176 */
177 if (dh_min_bits > 0) {
178 crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
179 dh_min_bits);
180 gnutls_dh_set_prime_bits(*session, dh_min_bits);
181 }
182}
183
184static unsigned int
185get_bound_dh_bits(unsigned int dh_bits)
186{
187 int dh_min_bits;
188 int dh_max_bits;
189
191 0);
193 0);
194
195 if ((dh_max_bits > 0) && (dh_max_bits < dh_min_bits)) {
196 crm_warn("Ignoring PCMK_dh_max_bits less than PCMK_dh_min_bits");
197 dh_max_bits = 0;
198 }
199 if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
200 return dh_min_bits;
201 }
202 if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
203 return dh_max_bits;
204 }
205 return dh_bits;
206}
207
219gnutls_session_t *
220pcmk__new_tls_session(int csock, unsigned int conn_type,
221 gnutls_credentials_type_t cred_type, void *credentials)
222{
223 int rc = GNUTLS_E_SUCCESS;
224 const char *prio_base = NULL;
225 char *prio = NULL;
226 gnutls_session_t *session = NULL;
227
228 /* Determine list of acceptable ciphers, etc. Pacemaker always adds the
229 * values required for its functionality.
230 *
231 * For an example of anonymous authentication, see:
232 * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
233 */
234
236 if (prio_base == NULL) {
237 prio_base = PCMK_GNUTLS_PRIORITIES;
238 }
239 prio = crm_strdup_printf("%s:%s", prio_base,
240 (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
241
242 session = gnutls_malloc(sizeof(gnutls_session_t));
243 if (session == NULL) {
244 rc = GNUTLS_E_MEMORY_ERROR;
245 goto error;
246 }
247
248 rc = gnutls_init(session, conn_type);
249 if (rc != GNUTLS_E_SUCCESS) {
250 goto error;
251 }
252
253 /* @TODO On the server side, it would be more efficient to cache the
254 * priority with gnutls_priority_init2() and set it with
255 * gnutls_priority_set() for all sessions.
256 */
257 rc = gnutls_priority_set_direct(*session, prio, NULL);
258 if (rc != GNUTLS_E_SUCCESS) {
259 goto error;
260 }
261 if (conn_type == GNUTLS_CLIENT) {
262 set_minimum_dh_bits(session);
263 }
264
265 gnutls_transport_set_ptr(*session,
266 (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
267
268 rc = gnutls_credentials_set(*session, cred_type, credentials);
269 if (rc != GNUTLS_E_SUCCESS) {
270 goto error;
271 }
272 free(prio);
273 return session;
274
275error:
276 crm_err("Could not initialize %s TLS %s session: %s "
277 CRM_XS " rc=%d priority='%s'",
278 (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
279 (conn_type == GNUTLS_SERVER)? "server" : "client",
280 gnutls_strerror(rc), rc, prio);
281 free(prio);
282 if (session != NULL) {
283 gnutls_free(session);
284 }
285 return NULL;
286}
287
303int
304pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
305{
306 int rc = GNUTLS_E_SUCCESS;
307 unsigned int dh_bits = 0;
308
309 rc = gnutls_dh_params_init(dh_params);
310 if (rc != GNUTLS_E_SUCCESS) {
311 goto error;
312 }
313
314 dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
315 GNUTLS_SEC_PARAM_NORMAL);
316 if (dh_bits == 0) {
317 rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
318 goto error;
319 }
320 dh_bits = get_bound_dh_bits(dh_bits);
321
322 crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
323 dh_bits);
324 rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
325 if (rc != GNUTLS_E_SUCCESS) {
326 goto error;
327 }
328
329 return pcmk_rc_ok;
330
331error:
332 crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
333 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
334 return EPROTO;
335}
336
348int
349pcmk__read_handshake_data(const pcmk__client_t *client)
350{
351 int rc = 0;
352
353 CRM_ASSERT(client && client->remote && client->remote->tls_session);
354
355 do {
356 rc = gnutls_handshake(*client->remote->tls_session);
357 } while (rc == GNUTLS_E_INTERRUPTED);
358
359 if (rc == GNUTLS_E_AGAIN) {
360 /* No more data is available at the moment. This function should be
361 * invoked again once the client sends more.
362 */
363 return EAGAIN;
364 } else if (rc != GNUTLS_E_SUCCESS) {
365 crm_err("TLS handshake with remote client failed: %s "
366 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
367 return EPROTO;
368 }
369 return pcmk_rc_ok;
370}
371
372// \return Standard Pacemaker return code
373static int
374send_tls(gnutls_session_t *session, struct iovec *iov)
375{
376 const char *unsent = iov->iov_base;
377 size_t unsent_len = iov->iov_len;
378 ssize_t gnutls_rc;
379
380 if (unsent == NULL) {
381 return EINVAL;
382 }
383
384 crm_trace("Sending TLS message of %llu bytes",
385 (unsigned long long) unsent_len);
386 while (true) {
387 gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
388
389 if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
390 crm_trace("Retrying to send %llu bytes remaining",
391 (unsigned long long) unsent_len);
392
393 } else if (gnutls_rc < 0) {
394 // Caller can log as error if necessary
395 crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld",
396 gnutls_strerror((int) gnutls_rc),
397 (long long) gnutls_rc);
398 return ECONNABORTED;
399
400 } else if (gnutls_rc < unsent_len) {
401 crm_trace("Sent %lld of %llu bytes remaining",
402 (long long) gnutls_rc, (unsigned long long) unsent_len);
403 unsent_len -= gnutls_rc;
404 unsent += gnutls_rc;
405 } else {
406 crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
407 break;
408 }
409 }
410 return pcmk_rc_ok;
411}
412#endif
413
414// \return Standard Pacemaker return code
415static int
416send_plaintext(int sock, struct iovec *iov)
417{
418 const char *unsent = iov->iov_base;
419 size_t unsent_len = iov->iov_len;
420 ssize_t write_rc;
421
422 if (unsent == NULL) {
423 return EINVAL;
424 }
425
426 crm_debug("Sending plaintext message of %llu bytes to socket %d",
427 (unsigned long long) unsent_len, sock);
428 while (true) {
429 write_rc = write(sock, unsent, unsent_len);
430 if (write_rc < 0) {
431 int rc = errno;
432
433 if ((errno == EINTR) || (errno == EAGAIN)) {
434 crm_trace("Retrying to send %llu bytes remaining to socket %d",
435 (unsigned long long) unsent_len, sock);
436 continue;
437 }
438
439 // Caller can log as error if necessary
440 crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d",
441 pcmk_rc_str(rc), rc, sock);
442 return rc;
443
444 } else if (write_rc < unsent_len) {
445 crm_trace("Sent %lld of %llu bytes remaining",
446 (long long) write_rc, (unsigned long long) unsent_len);
447 unsent += write_rc;
448 unsent_len -= write_rc;
449 continue;
450
451 } else {
452 crm_trace("Sent all %lld bytes remaining: %.100s",
453 (long long) write_rc, (char *) (iov->iov_base));
454 break;
455 }
456 }
457 return pcmk_rc_ok;
458}
459
460// \return Standard Pacemaker return code
461static int
462remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
463{
464 int rc = pcmk_rc_ok;
465
466 for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
467#ifdef HAVE_GNUTLS_GNUTLS_H
468 if (remote->tls_session) {
469 rc = send_tls(remote->tls_session, &(iov[lpc]));
470 continue;
471 }
472#endif
473 if (remote->tcp_socket) {
474 rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
475 } else {
476 rc = ESOCKTNOSUPPORT;
477 }
478 }
479 return rc;
480}
481
491int
492pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
493{
494 int rc = pcmk_rc_ok;
495 static uint64_t id = 0;
496 GString *xml_text = NULL;
497
498 struct iovec iov[2];
499 struct remote_header_v0 *header;
500
501 CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
502
503 xml_text = g_string_sized_new(1024);
504 pcmk__xml_string(msg, 0, xml_text, 0);
505 CRM_CHECK(xml_text->len > 0,
506 g_string_free(xml_text, TRUE); return EINVAL);
507
508 header = pcmk__assert_alloc(1, sizeof(struct remote_header_v0));
509
510 iov[0].iov_base = header;
511 iov[0].iov_len = sizeof(struct remote_header_v0);
512
513 iov[1].iov_len = 1 + xml_text->len;
514 iov[1].iov_base = g_string_free(xml_text, FALSE);
515
516 id++;
517 header->id = id;
518 header->endian = ENDIAN_LOCAL;
519 header->version = REMOTE_MSG_VERSION;
520 header->payload_offset = iov[0].iov_len;
521 header->payload_uncompressed = iov[1].iov_len;
522 header->size_total = iov[0].iov_len + iov[1].iov_len;
523
524 rc = remote_send_iovs(remote, iov, 2);
525 if (rc != pcmk_rc_ok) {
526 crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
527 pcmk_rc_str(rc), rc);
528 }
529
530 free(iov[0].iov_base);
531 g_free((gchar *) iov[1].iov_base);
532 return rc;
533}
534
544xmlNode *
546{
547 xmlNode *xml = NULL;
548 struct remote_header_v0 *header = localized_remote_header(remote);
549
550 if (header == NULL) {
551 return NULL;
552 }
553
554 /* Support compression on the receiving end now, in case we ever want to add it later */
555 if (header->payload_compressed) {
556 int rc = 0;
557 unsigned int size_u = 1 + header->payload_uncompressed;
558 char *uncompressed =
559 pcmk__assert_alloc(1, header->payload_offset + size_u);
560
561 crm_trace("Decompressing message data %d bytes into %d bytes",
562 header->payload_compressed, size_u);
563
564 rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
565 remote->buffer + header->payload_offset,
566 header->payload_compressed, 1, 0);
567 rc = pcmk__bzlib2rc(rc);
568
569 if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
570 crm_warn("Couldn't decompress v%d message, we only understand v%d",
571 header->version, REMOTE_MSG_VERSION);
572 free(uncompressed);
573 return NULL;
574
575 } else if (rc != pcmk_rc_ok) {
576 crm_err("Decompression failed: %s " CRM_XS " rc=%d",
577 pcmk_rc_str(rc), rc);
578 free(uncompressed);
579 return NULL;
580 }
581
582 CRM_ASSERT(size_u == header->payload_uncompressed);
583
584 memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
585 remote->buffer_size = header->payload_offset + size_u;
586
587 free(remote->buffer);
588 remote->buffer = uncompressed;
589 header = localized_remote_header(remote);
590 }
591
592 /* take ownership of the buffer */
593 remote->buffer_offset = 0;
594
595 CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
596
597 xml = pcmk__xml_parse(remote->buffer + header->payload_offset);
598 if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
599 crm_warn("Couldn't parse v%d message, we only understand v%d",
600 header->version, REMOTE_MSG_VERSION);
601
602 } else if (xml == NULL) {
603 crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
604 }
605
606 return xml;
607}
608
609static int
610get_remote_socket(const pcmk__remote_t *remote)
611{
612#ifdef HAVE_GNUTLS_GNUTLS_H
613 if (remote->tls_session) {
614 void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
615
616 return GPOINTER_TO_INT(sock_ptr);
617 }
618#endif
619
620 if (remote->tcp_socket) {
621 return remote->tcp_socket;
622 }
623
624 crm_err("Remote connection type undetermined (bug?)");
625 return -1;
626}
627
639int
640pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
641{
642 struct pollfd fds = { 0, };
643 int sock = 0;
644 int rc = 0;
645 time_t start;
646 int timeout = timeout_ms;
647
648 sock = get_remote_socket(remote);
649 if (sock <= 0) {
650 crm_trace("No longer connected");
651 return ENOTCONN;
652 }
653
654 start = time(NULL);
655 errno = 0;
656 do {
657 fds.fd = sock;
658 fds.events = POLLIN;
659
660 /* If we got an EINTR while polling, and we have a
661 * specific timeout we are trying to honor, attempt
662 * to adjust the timeout to the closest second. */
663 if (errno == EINTR && (timeout > 0)) {
664 timeout = timeout_ms - ((time(NULL) - start) * 1000);
665 if (timeout < 1000) {
666 timeout = 1000;
667 }
668 }
669
670 rc = poll(&fds, 1, timeout);
671 } while (rc < 0 && errno == EINTR);
672
673 if (rc < 0) {
674 return errno;
675 }
676 return (rc == 0)? ETIME : pcmk_rc_ok;
677}
678
691static int
692read_available_remote_data(pcmk__remote_t *remote)
693{
694 int rc = pcmk_rc_ok;
695 size_t read_len = sizeof(struct remote_header_v0);
696 struct remote_header_v0 *header = localized_remote_header(remote);
697 bool received = false;
698 ssize_t read_rc;
699
700 if(header) {
701 /* Stop at the end of the current message */
702 read_len = header->size_total;
703 }
704
705 /* automatically grow the buffer when needed */
706 if(remote->buffer_size < read_len) {
707 remote->buffer_size = 2 * read_len;
708 crm_trace("Expanding buffer to %llu bytes",
709 (unsigned long long) remote->buffer_size);
710 remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
711 }
712
713#ifdef HAVE_GNUTLS_GNUTLS_H
714 if (!received && remote->tls_session) {
715 read_rc = gnutls_record_recv(*(remote->tls_session),
716 remote->buffer + remote->buffer_offset,
717 remote->buffer_size - remote->buffer_offset);
718 if (read_rc == GNUTLS_E_INTERRUPTED) {
719 rc = EINTR;
720 } else if (read_rc == GNUTLS_E_AGAIN) {
721 rc = EAGAIN;
722 } else if (read_rc < 0) {
723 crm_debug("TLS receive failed: %s (%lld)",
724 gnutls_strerror(read_rc), (long long) read_rc);
725 rc = EIO;
726 }
727 received = true;
728 }
729#endif
730
731 if (!received && remote->tcp_socket) {
732 read_rc = read(remote->tcp_socket,
733 remote->buffer + remote->buffer_offset,
734 remote->buffer_size - remote->buffer_offset);
735 if (read_rc < 0) {
736 rc = errno;
737 }
738 received = true;
739 }
740
741 if (!received) {
742 crm_err("Remote connection type undetermined (bug?)");
743 return ESOCKTNOSUPPORT;
744 }
745
746 /* process any errors. */
747 if (read_rc > 0) {
748 remote->buffer_offset += read_rc;
749 /* always null terminate buffer, the +1 to alloc always allows for this. */
750 remote->buffer[remote->buffer_offset] = '\0';
751 crm_trace("Received %lld more bytes (%llu total)",
752 (long long) read_rc,
753 (unsigned long long) remote->buffer_offset);
754
755 } else if ((rc == EINTR) || (rc == EAGAIN)) {
756 crm_trace("No data available for non-blocking remote read: %s (%d)",
757 pcmk_rc_str(rc), rc);
758
759 } else if (read_rc == 0) {
760 crm_debug("End of remote data encountered after %llu bytes",
761 (unsigned long long) remote->buffer_offset);
762 return ENOTCONN;
763
764 } else {
765 crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
766 (unsigned long long) remote->buffer_offset,
767 pcmk_rc_str(rc), rc);
768 return ENOTCONN;
769 }
770
771 header = localized_remote_header(remote);
772 if(header) {
773 if(remote->buffer_offset < header->size_total) {
774 crm_trace("Read partial remote message (%llu of %u bytes)",
775 (unsigned long long) remote->buffer_offset,
776 header->size_total);
777 } else {
778 crm_trace("Read full remote message of %llu bytes",
779 (unsigned long long) remote->buffer_offset);
780 return pcmk_rc_ok;
781 }
782 }
783
784 return EAGAIN;
785}
786
797int
799{
800 int rc = pcmk_rc_ok;
801 time_t start = time(NULL);
802 int remaining_timeout = 0;
803
804 if (timeout_ms == 0) {
805 timeout_ms = 10000;
806 } else if (timeout_ms < 0) {
807 timeout_ms = 60000;
808 }
809
810 remaining_timeout = timeout_ms;
811 while (remaining_timeout > 0) {
812
813 crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
814 remaining_timeout, timeout_ms);
815 rc = pcmk__remote_ready(remote, remaining_timeout);
816
817 if (rc == ETIME) {
818 crm_err("Timed out (%d ms) while waiting for remote data",
819 remaining_timeout);
820 return rc;
821
822 } else if (rc != pcmk_rc_ok) {
823 crm_debug("Wait for remote data aborted (will retry): %s "
824 CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
825
826 } else {
827 rc = read_available_remote_data(remote);
828 if (rc == pcmk_rc_ok) {
829 return rc;
830 } else if (rc == EAGAIN) {
831 crm_trace("Waiting for more remote data");
832 } else {
833 crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
834 pcmk_rc_str(rc), rc);
835 }
836 }
837
838 // Don't waste time retrying after fatal errors
839 if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
840 return rc;
841 }
842
843 remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
844 }
845 return ETIME;
846}
847
848struct tcp_async_cb_data {
849 int sock;
850 int timeout_ms;
851 time_t start;
852 void *userdata;
853 void (*callback) (void *userdata, int rc, int sock);
855
856// \return TRUE if timer should be rescheduled, FALSE otherwise
857static gboolean
858check_connect_finished(gpointer userdata)
859{
860 struct tcp_async_cb_data *cb_data = userdata;
861 int rc;
862
863 fd_set rset, wset;
864 struct timeval ts = { 0, };
865
866 if (cb_data->start == 0) {
867 // Last connect() returned success immediately
868 rc = pcmk_rc_ok;
869 goto dispatch_done;
870 }
871
872 // If the socket is ready for reading or writing, the connect succeeded
873 FD_ZERO(&rset);
874 FD_SET(cb_data->sock, &rset);
875 wset = rset;
876 rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
877
878 if (rc < 0) { // select() error
879 rc = errno;
880 if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
881 if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
882 return TRUE; // There is time left, so reschedule timer
883 } else {
884 rc = ETIMEDOUT;
885 }
886 }
887 crm_trace("Could not check socket %d for connection success: %s (%d)",
888 cb_data->sock, pcmk_rc_str(rc), rc);
889
890 } else if (rc == 0) { // select() timeout
891 if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
892 return TRUE; // There is time left, so reschedule timer
893 }
894 crm_debug("Timed out while waiting for socket %d connection success",
895 cb_data->sock);
896 rc = ETIMEDOUT;
897
898 // select() returned number of file descriptors that are ready
899
900 } else if (FD_ISSET(cb_data->sock, &rset)
901 || FD_ISSET(cb_data->sock, &wset)) {
902
903 // The socket is ready; check it for connection errors
904 int error = 0;
905 socklen_t len = sizeof(error);
906
907 if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
908 rc = errno;
909 crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
910 cb_data->sock, pcmk_rc_str(rc), rc);
911 } else if (error != 0) {
912 rc = error;
913 crm_trace("Socket %d connected with error: %s (%d)",
914 cb_data->sock, pcmk_rc_str(rc), rc);
915 } else {
916 rc = pcmk_rc_ok;
917 }
918
919 } else { // Should not be possible
920 crm_trace("select() succeeded, but socket %d not in resulting "
921 "read/write sets", cb_data->sock);
922 rc = EAGAIN;
923 }
924
925 dispatch_done:
926 if (rc == pcmk_rc_ok) {
927 crm_trace("Socket %d is connected", cb_data->sock);
928 } else {
929 close(cb_data->sock);
930 cb_data->sock = -1;
931 }
932
933 if (cb_data->callback) {
934 cb_data->callback(cb_data->userdata, rc, cb_data->sock);
935 }
936 free(cb_data);
937 return FALSE; // Do not reschedule timer
938}
939
958static int
959connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
960 int timeout_ms, int *timer_id, void *userdata,
961 void (*callback) (void *userdata, int rc, int sock))
962{
963 int rc = 0;
964 int interval = 500;
965 int timer;
966 struct tcp_async_cb_data *cb_data = NULL;
967
968 rc = pcmk__set_nonblocking(sock);
969 if (rc != pcmk_rc_ok) {
970 crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
971 pcmk_rc_str(rc), rc);
972 return rc;
973 }
974
975 rc = connect(sock, addr, addrlen);
976 if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
977 rc = errno;
978 crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
979 pcmk_rc_str(rc), rc);
980 return rc;
981 }
982
983 cb_data = pcmk__assert_alloc(1, sizeof(struct tcp_async_cb_data));
984 cb_data->userdata = userdata;
985 cb_data->callback = callback;
986 cb_data->sock = sock;
987 cb_data->timeout_ms = timeout_ms;
988
989 if (rc == 0) {
990 /* The connect was successful immediately, we still return to mainloop
991 * and let this callback get called later. This avoids the user of this api
992 * to have to account for the fact the callback could be invoked within this
993 * function before returning. */
994 cb_data->start = 0;
995 interval = 1;
996 } else {
997 cb_data->start = time(NULL);
998 }
999
1000 /* This timer function does a non-blocking poll on the socket to see if we
1001 * can use it. Once we can, the connect has completed. This method allows us
1002 * to connect without blocking the mainloop.
1003 *
1004 * @TODO Use a mainloop fd callback for this instead of polling. Something
1005 * about the way mainloop is currently polling prevents this from
1006 * working at the moment though. (See connect(2) regarding EINPROGRESS
1007 * for possible new handling needed.)
1008 */
1009 crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
1010 interval, sock);
1011 timer = g_timeout_add(interval, check_connect_finished, cb_data);
1012 if (timer_id) {
1013 *timer_id = timer;
1014 }
1015
1016 // timer callback should be taking care of cb_data
1017 // cppcheck-suppress memleak
1018 return pcmk_rc_ok;
1019}
1020
1031static int
1032connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
1033{
1034 int rc = connect(sock, addr, addrlen);
1035
1036 if (rc < 0) {
1037 rc = errno;
1038 crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1039 pcmk_rc_str(rc), rc);
1040 return rc;
1041 }
1042
1043 rc = pcmk__set_nonblocking(sock);
1044 if (rc != pcmk_rc_ok) {
1045 crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1046 pcmk_rc_str(rc), rc);
1047 return rc;
1048 }
1049
1050 return pcmk_ok;
1051}
1052
1069int
1070pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
1071 int *sock_fd, void *userdata,
1072 void (*callback) (void *userdata, int rc, int sock))
1073{
1074 char buffer[INET6_ADDRSTRLEN];
1075 struct addrinfo *res = NULL;
1076 struct addrinfo *rp = NULL;
1077 struct addrinfo hints;
1078 const char *server = host;
1079 int rc;
1080 int sock = -1;
1081
1082 CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
1083
1084 // Get host's IP address(es)
1085 memset(&hints, 0, sizeof(struct addrinfo));
1086 hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
1087 hints.ai_socktype = SOCK_STREAM;
1088 hints.ai_flags = AI_CANONNAME;
1089
1090 rc = getaddrinfo(server, NULL, &hints, &res);
1091 rc = pcmk__gaierror2rc(rc);
1092
1093 if (rc != pcmk_rc_ok) {
1094 crm_err("Unable to get IP address info for %s: %s",
1095 server, pcmk_rc_str(rc));
1096 goto async_cleanup;
1097 }
1098
1099 if (!res || !res->ai_addr) {
1100 crm_err("Unable to get IP address info for %s: no result", server);
1101 rc = ENOTCONN;
1102 goto async_cleanup;
1103 }
1104
1105 // getaddrinfo() returns a list of host's addresses, try them in order
1106 for (rp = res; rp != NULL; rp = rp->ai_next) {
1107 struct sockaddr *addr = rp->ai_addr;
1108
1109 if (!addr) {
1110 continue;
1111 }
1112
1113 if (rp->ai_canonname) {
1114 server = res->ai_canonname;
1115 }
1116 crm_debug("Got canonical name %s for %s", server, host);
1117
1118 sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1119 if (sock == -1) {
1120 rc = errno;
1121 crm_warn("Could not create socket for remote connection to %s:%d: "
1122 "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
1123 continue;
1124 }
1125
1126 /* Set port appropriately for address family */
1127 /* (void*) casts avoid false-positive compiler alignment warnings */
1128 if (addr->sa_family == AF_INET6) {
1129 ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1130 } else {
1131 ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1132 }
1133
1134 memset(buffer, 0, PCMK__NELEM(buffer));
1135 pcmk__sockaddr2str(addr, buffer);
1136 crm_info("Attempting remote connection to %s:%d", buffer, port);
1137
1138 if (callback) {
1139 if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
1140 timer_id, userdata, callback) == pcmk_rc_ok) {
1141 goto async_cleanup; /* Success for now, we'll hear back later in the callback */
1142 }
1143
1144 } else if (connect_socket_once(sock, rp->ai_addr,
1145 rp->ai_addrlen) == pcmk_rc_ok) {
1146 break; /* Success */
1147 }
1148
1149 // Connect failed
1150 close(sock);
1151 sock = -1;
1152 rc = ENOTCONN;
1153 }
1154
1155async_cleanup:
1156
1157 if (res) {
1158 freeaddrinfo(res);
1159 }
1160 *sock_fd = sock;
1161 return rc;
1162}
1163
1175void
1176pcmk__sockaddr2str(const void *sa, char *s)
1177{
1178 switch (((const struct sockaddr *) sa)->sa_family) {
1179 case AF_INET:
1180 inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
1181 s, INET6_ADDRSTRLEN);
1182 break;
1183
1184 case AF_INET6:
1185 inet_ntop(AF_INET6,
1186 &(((const struct sockaddr_in6 *) sa)->sin6_addr),
1187 s, INET6_ADDRSTRLEN);
1188 break;
1189
1190 default:
1191 strcpy(s, "<invalid>");
1192 }
1193}
1194
1204int
1206{
1207 int rc;
1208 struct sockaddr_storage addr;
1209 socklen_t laddr = sizeof(addr);
1210 char addr_str[INET6_ADDRSTRLEN];
1211#ifdef TCP_USER_TIMEOUT
1212 long sbd_timeout = 0;
1213#endif
1214
1215 /* accept the connection */
1216 memset(&addr, 0, sizeof(addr));
1217 *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1218 if (*csock == -1) {
1219 rc = errno;
1220 crm_err("Could not accept remote client connection: %s "
1221 CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
1222 return rc;
1223 }
1224 pcmk__sockaddr2str(&addr, addr_str);
1225 crm_info("Accepted new remote client connection from %s", addr_str);
1226
1227 rc = pcmk__set_nonblocking(*csock);
1228 if (rc != pcmk_rc_ok) {
1229 crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1230 pcmk_rc_str(rc), rc);
1231 close(*csock);
1232 *csock = -1;
1233 return rc;
1234 }
1235
1236#ifdef TCP_USER_TIMEOUT
1237 sbd_timeout = pcmk__get_sbd_watchdog_timeout();
1238 if (sbd_timeout > 0) {
1239 // Time to fail and retry before watchdog
1240 long half = sbd_timeout / 2;
1241 unsigned int optval = (half <= UINT_MAX)? half : UINT_MAX;
1242
1243 rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
1244 &optval, sizeof(optval));
1245 if (rc < 0) {
1246 rc = errno;
1247 crm_err("Could not set TCP timeout to %d ms on remote connection: "
1248 "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
1249 close(*csock);
1250 *csock = -1;
1251 return rc;
1252 }
1253 }
1254#endif
1255
1256 return rc;
1257}
1258
1264int
1266{
1267 static int port = 0;
1268
1269 if (port == 0) {
1270 const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
1271
1272 if (env) {
1273 errno = 0;
1274 port = strtol(env, NULL, 10);
1275 if (errno || (port < 1) || (port > 65535)) {
1276 crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
1277 " has invalid value '%s', using %d instead",
1278 env, DEFAULT_REMOTE_PORT);
1279 port = DEFAULT_REMOTE_PORT;
1280 }
1281 } else {
1282 port = DEFAULT_REMOTE_PORT;
1283 }
1284 }
1285 return port;
1286}
#define PCMK__NELEM(a)
Definition internal.h:48
#define pcmk__assert_alloc(nmemb, size)
Definition internal.h:297
struct tcp_async_cb_data __attribute__
#define ENDIAN_LOCAL
Definition remote.c:70
#define __swab32(x)
Definition remote.c:52
uint32_t size_total
Definition remote.c:4
uint32_t endian
Definition remote.c:0
int pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
Definition remote.c:640
uint32_t payload_uncompressed
Definition remote.c:7
#define REMOTE_MSG_VERSION
Definition remote.c:69
uint32_t payload_compressed
Definition remote.c:6
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
Definition remote.c:1265
uint64_t id
Definition remote.c:2
int pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
Definition remote.c:492
uint64_t flags
Definition remote.c:3
int pcmk__accept_remote_connection(int ssock, int *csock)
Definition remote.c:1205
#define __swab64(x)
Definition remote.c:58
uint32_t version
Definition remote.c:1
void pcmk__sockaddr2str(const void *sa, char *s)
Definition remote.c:1176
uint32_t payload_offset
Definition remote.c:5
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
Definition remote.c:545
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
Definition remote.c:1070
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
Definition remote.c:798
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define PCMK_GNUTLS_PRIORITIES
Definition config.h:538
pcmk__cpg_host_t host
Definition cpg.c:4
uint32_t id
Definition cpg.c:0
A dumping ground.
int pcmk__set_nonblocking(int fd)
Definition io.c:524
#define crm_info(fmt, args...)
Definition logging.h:397
#define crm_warn(fmt, args...)
Definition logging.h:392
#define CRM_XS
Definition logging.h:56
#define CRM_LOG_ASSERT(expr)
Definition logging.h:228
#define CRM_CHECK(expr, failure_action)
Definition logging.h:245
#define crm_debug(fmt, args...)
Definition logging.h:400
#define crm_err(fmt, args...)
Definition logging.h:389
#define crm_trace(fmt, args...)
Definition logging.h:402
#define DEFAULT_REMOTE_PORT
Definition lrmd.h:67
Wrappers for and extensions to glib mainloop.
#define PCMK__ENV_REMOTE_PORT
#define PCMK__ENV_DH_MAX_BITS
#define PCMK__ENV_TLS_PRIORITIES
long pcmk__get_sbd_watchdog_timeout(void)
Definition watchdog.c:235
const char * pcmk__env_option(const char *option)
Definition options.c:1088
#define PCMK__ENV_DH_MIN_BITS
unsigned int timeout
Definition pcmk_fence.c:32
#define ETIME
const char * pcmk_strerror(int rc)
Definition results.c:149
#define CRM_ASSERT(expr)
Definition results.h:42
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition results.c:501
@ pcmk_rc_ok
Definition results.h:162
#define pcmk_ok
Definition results.h:69
int pcmk_legacy2rc(int legacy_rc)
Definition results.c:559
int pcmk__gaierror2rc(int gai)
Map a getaddrinfo() return code to the most similar Pacemaker return code.
Definition results.c:865
int pcmk__bzlib2rc(int bz2)
Map a bz2 return code to the most similar Pacemaker return code.
Definition results.c:906
int pcmk__scan_min_int(const char *text, int *result, int minimum)
Definition strings.c:127
struct pcmk__remote_s * remote
Wrappers for and extensions to libxml2.
void pcmk__xml_string(const xmlNode *data, uint32_t options, GString *buffer, int depth)
Definition xml_io.c:488
xmlNode * pcmk__xml_parse(const char *input)
Definition xml_io.c:244