root/trunk/src/io/bmi/bmi_mx/mx.c @ 8304

Revision 8304, 114.6 KB (checked in by pcarns, 3 years ago)

merging bmi-experimental-branch to trunk

Line 
1/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 *  vim:expandtab:shiftwidth=8:tabstop=8:
3 *
4 *   Copyright (C) 2007 Myricom, Inc.
5 *   Author: Myricom, Inc. <help at myri.com>
6 *
7 *   See COPYING in top-level directory.
8 */
9
10#include "mx.h"
11#include "pint-hint.h"
12#include "pint-event.h"
13#include "pvfs2-debug.h"
14
15
16static int      tmp_id  = 0;    /* temporary id until bmi_mx is init'ed */
17struct bmx_data *bmi_mx = NULL; /* global state for bmi_mx */
18
19mx_status_t     BMX_NO_STATUS;
20
21#if BMX_MEM_ACCT
22uint64_t        mem_used = 0;   /* bytes used */
23gen_mutex_t     mem_used_lock;  /* lock */
24#endif
25
26/* statics for event logging */
27static PINT_event_type bmi_mx_send_event_id;
28static PINT_event_type bmi_mx_recv_event_id;
29
30static PINT_event_group bmi_mx_event_group;
31static pid_t bmi_mx_pid;
32
33mx_unexp_handler_action_t
34bmx_unexpected_recv(void *context, mx_endpoint_addr_t source,
35                      uint64_t match_value, uint32_t length, void *data_if_available);
36
37static int
38bmx_peer_connect(struct bmx_peer *peer);
39static void
40bmx_create_peername(void);
41
42/**** Completion function token handling ****************************/
43/* We should not hold any locks when calling mx_test[_any](),
44 * mx_wait_any() or mx_cancel(). We want to avoid races between them,
45 * however. So, before calling any completion function, the caller
46 * must hold this token.  These functions implement a token system (i.e.
47 * semaphore) that will wake up mx_wait_any() to reduce blocking times
48 * for the calling function.
49 */
50
51static void
52bmx_get_completion_token(void)
53{
54        int     done    = 0;
55
56        do {
57                gen_mutex_lock(&bmi_mx->bmx_completion_lock);
58                if (bmi_mx->bmx_refcount == 1) {
59                        bmi_mx->bmx_refcount--;
60                        done = 1;
61                        gen_mutex_unlock(&bmi_mx->bmx_completion_lock);
62                } else {
63                        assert(bmi_mx->bmx_refcount == 0);
64                        /* someone has the lock, wake the MX endpoint in
65                         * case they are blocking in mx_wait_any() */
66                        gen_mutex_unlock(&bmi_mx->bmx_completion_lock);
67                        mx_wakeup(bmi_mx->bmx_ep);
68                }
69        } while (!done);
70
71        return;
72}
73
74static void
75bmx_release_completion_token(void)
76{
77        gen_mutex_lock(&bmi_mx->bmx_completion_lock);
78        bmi_mx->bmx_refcount++;
79        assert(bmi_mx->bmx_refcount == 1);
80        gen_mutex_unlock(&bmi_mx->bmx_completion_lock);
81        return;
82}
83
84/**** TX/RX handling functions **************************************/
85
86static void
87bmx_ctx_free(struct bmx_ctx *ctx)
88{
89        if (ctx == NULL) return;
90
91        if (!qlist_empty(&ctx->mxc_global_list))
92                qlist_del_init(&ctx->mxc_global_list);
93
94        if (!qlist_empty(&ctx->mxc_list))
95                qlist_del_init(&ctx->mxc_list);
96
97        if (ctx->mxc_buffer != NULL) {
98                BMX_FREE(ctx->mxc_buffer, BMX_UNEXPECTED_SIZE);
99        }
100        if (ctx->mxc_seg_list != NULL) {
101                if (ctx->mxc_nseg > 0) {
102                        BMX_FREE(ctx->mxc_seg_list, ctx->mxc_nseg * sizeof(void *));
103                }
104        }
105        BMX_FREE(ctx, sizeof(*ctx));
106        return;
107}
108
109static int
110bmx_ctx_alloc(struct bmx_ctx **ctxp, enum bmx_req_type type)
111{
112        struct bmx_ctx *ctx     = NULL;
113
114        if (ctxp == NULL) return -BMI_EINVAL;
115
116        BMX_MALLOC(ctx, sizeof(*ctx));
117        if (ctx == NULL) return -BMI_ENOMEM;
118
119        ctx->mxc_type = type;
120        ctx->mxc_state = BMX_CTX_INIT;
121        /* ctx->mxc_msg_type */
122
123        INIT_QLIST_HEAD(&ctx->mxc_global_list);
124        INIT_QLIST_HEAD(&ctx->mxc_list);
125
126        /* ctx->mxc_mop */
127        /* ctx->mxc_peer */
128        /* ctx->mxc_tag */
129
130        ctx->mxc_seg.segment_ptr = ctx->mxc_buffer;
131        /* ctx->mxc_seg.segment_length */
132        /* only server recv unexpected messages */
133        if (bmi_mx->bmx_is_server == 1 && type == BMX_REQ_RX) {
134#if BMX_MEM_TWEAK
135                int i = 0;
136                for (i = 0; i < BMX_UNEXPECTED_NUM; i++) {
137                        struct bmx_buffer *buf  = NULL;
138                        BMX_MALLOC(buf, sizeof(*buf));
139                        if (buf) {
140                                INIT_QLIST_HEAD(&buf->mxb_list);
141                                BMX_MALLOC(buf->mxb_buffer, BMX_UNEXPECTED_SIZE);
142                                if (buf->mxb_buffer) {
143                                        if (i == 0) {
144                                                ctx->mxc_buffer = buf->mxb_buffer;
145                                                gen_mutex_lock(&bmi_mx->bmx_used_unex_buffers_lock);
146                                                qlist_add(&buf->mxb_list,
147                                                          &bmi_mx->bmx_used_unex_buffers);
148                                                gen_mutex_unlock(&bmi_mx->bmx_used_unex_buffers_lock);
149                                        } else {
150                                                gen_mutex_lock(&bmi_mx->bmx_idle_unex_buffers_lock);
151                                                qlist_add(&buf->mxb_list,
152                                                          &bmi_mx->bmx_idle_unex_buffers);
153                                                gen_mutex_unlock(&bmi_mx->bmx_idle_unex_buffers_lock);
154                                        }
155                                } else {
156                                        BMX_FREE(buf, sizeof(*buf));
157                                }
158                        }
159                }
160#else
161                BMX_MALLOC(ctx->mxc_buffer, BMX_UNEXPECTED_SIZE);
162                if (ctx->mxc_buffer == NULL) {
163                        bmx_ctx_free(ctx);
164                        return -BMI_ENOMEM;
165                }
166#endif
167        }
168        /* ctx->mxc_seg_list */
169        /* ctx->mxc_nseg */
170        /* ctx->mxc_nob */
171        /* ctx->mxc_mxreq */
172        /* ctx->mxc_mxstat */
173
174        ctx->mxc_get = 1; /* put_idle_ctx() will increment the put */
175        /* ctx->mxc_put */
176
177        if (type == BMX_REQ_TX) {
178                gen_mutex_lock(&bmi_mx->bmx_lock);
179                qlist_add_tail(&ctx->mxc_global_list, &bmi_mx->bmx_txs);
180                gen_mutex_unlock(&bmi_mx->bmx_lock);
181        } else {
182                gen_mutex_lock(&bmi_mx->bmx_lock);
183                qlist_add_tail(&ctx->mxc_global_list, &bmi_mx->bmx_rxs);
184                gen_mutex_unlock(&bmi_mx->bmx_lock);
185        }
186
187        *ctxp = ctx;
188        return 0;
189}
190
191static void
192bmx_ctx_init(struct bmx_ctx *ctx)
193{
194        struct bmx_peer *peer   = NULL;
195
196        BMX_ENTER;
197
198        if (ctx == NULL) return;
199
200        peer = ctx->mxc_peer;
201
202        /* ctx->mxc_type */
203        ctx->mxc_state = BMX_CTX_IDLE;
204        /* ctx->mxc_msg_type */
205       
206        /* ctx->mxc_global_list */
207        if (!qlist_empty(&ctx->mxc_list)) {
208                debug(BMX_DB_ERR, "%s %s still on a list", __func__,
209                      ctx->mxc_type == BMX_REQ_TX ? "tx" : "rx");
210                exit(1);
211        }
212
213        ctx->mxc_mop = NULL;
214        ctx->mxc_peer = NULL;
215        ctx->mxc_tag = 0;
216        ctx->mxc_match = 0ULL;
217
218        if (ctx->mxc_type == BMX_REQ_TX && ctx->mxc_buffer != NULL && ctx->mxc_nseg == 1) {
219                BMX_FREE(ctx->mxc_buffer, ctx->mxc_seg.segment_length);
220                ctx->mxc_buffer = NULL;
221        }
222        ctx->mxc_seg.segment_ptr = ctx->mxc_buffer;
223        ctx->mxc_seg.segment_length = 0;
224        if (ctx->mxc_seg_list != NULL) {
225                if (ctx->mxc_nseg > 0) {
226                        BMX_FREE(ctx->mxc_seg_list, ctx->mxc_nseg * sizeof(void *));
227                }
228                ctx->mxc_seg_list = NULL;
229        }
230        ctx->mxc_nseg = 0;
231        ctx->mxc_nob = 0LL;
232        ctx->mxc_mxreq = NULL;
233        memset(&ctx->mxc_mxstat, 0, sizeof(mx_status_t));
234
235        /* ctx->mxc_get */
236        /* ctx->mxc_put */
237
238        BMX_EXIT;
239        return;
240}
241
242/* add to peer's queued txs/rxs list */
243static void
244bmx_q_ctx(struct bmx_ctx *ctx)
245{
246        struct bmx_peer *peer = ctx->mxc_peer;
247        list_t          *queue = ctx->mxc_type == BMX_REQ_TX ? &peer->mxp_queued_txs :
248                                                              &peer->mxp_queued_rxs;
249
250        BMX_ENTER;
251        ctx->mxc_state = BMX_CTX_QUEUED;
252        gen_mutex_lock(&peer->mxp_lock);
253        qlist_add_tail(&ctx->mxc_list, queue);
254        gen_mutex_unlock(&peer->mxp_lock);
255        BMX_EXIT;
256        return;
257}
258
259/* add to peer's pending rxs list */
260static void
261bmx_q_pending_ctx(struct bmx_ctx *ctx)
262{
263        struct bmx_peer *peer = ctx->mxc_peer;
264
265        BMX_ENTER;
266        ctx->mxc_state = BMX_CTX_PENDING;
267        if (ctx->mxc_type == BMX_REQ_RX) {
268                if (peer) {
269                        gen_mutex_lock(&peer->mxp_lock);
270                        qlist_add_tail(&ctx->mxc_list, &peer->mxp_pending_rxs);
271                        gen_mutex_unlock(&peer->mxp_lock);
272                }
273        }
274        BMX_EXIT;
275        return;
276}
277
278/* remove from peer's pending rxs list */
279static void
280bmx_deq_pending_ctx(struct bmx_ctx *ctx)
281{
282        struct bmx_peer *peer = ctx->mxc_peer;
283
284        BMX_ENTER;
285        if (ctx->mxc_state == BMX_CTX_PENDING) {
286                ctx->mxc_state = BMX_CTX_COMPLETED;
287        }
288        if (ctx->mxc_type == BMX_REQ_RX) {
289                if (peer && !qlist_empty(&ctx->mxc_list)) {
290                        gen_mutex_lock(&peer->mxp_lock);
291                        qlist_del_init(&ctx->mxc_list);
292                        gen_mutex_unlock(&peer->mxp_lock);
293                }
294        }
295        BMX_EXIT;
296        return;
297}
298
299/* dequeue from unexpected rx list */
300static void
301bmx_deq_unex_rx(struct bmx_ctx **rxp)
302{
303        struct bmx_ctx  *rx     = NULL;
304        list_t          *list   = &bmi_mx->bmx_unex_rxs;
305
306        BMX_ENTER;
307        gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock);
308        if (!qlist_empty(list)) {
309                rx = qlist_entry(list->next, struct bmx_ctx, mxc_list);
310                qlist_del_init(&rx->mxc_list);
311        }
312        gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock);
313        *rxp = rx;
314
315        BMX_EXIT;
316        return;
317}
318
319/* add to the completion queue for the appropriate context */
320static void
321bmx_q_completed(struct bmx_ctx *ctx, enum bmx_ctx_state state,
322                mx_status_t status, bmi_error_code_t error)
323{
324        int             id      = 0;
325        gen_mutex_t     *lock   = NULL;
326        list_t          *list   = NULL;
327
328        BMX_ENTER;
329
330        ctx->mxc_state = state;
331        ctx->mxc_mxstat = status;
332        ctx->mxc_error = error < 0 ? error : -error;
333
334        if (ctx->mxc_type == BMX_REQ_RX &&
335            ctx->mxc_msg_type == BMX_MSG_UNEXPECTED) {
336                list = &bmi_mx->bmx_unex_rxs;
337                lock = &bmi_mx->bmx_unex_rxs_lock;
338        } else {
339                id = (int) ctx->mxc_mop->context_id;
340                lock = &bmi_mx->bmx_done_q_lock[id];
341                list = &bmi_mx->bmx_done_q[id];
342        }
343
344
345        gen_mutex_lock(lock);
346        qlist_add_tail(&ctx->mxc_list, list);
347        gen_mutex_unlock(lock);
348        BMX_EXIT;
349        return;
350}
351
352static void
353bmx_deq_completed(struct bmx_ctx **ctxp, bmi_context_id context_id)
354{
355        int             id      = (int) context_id;
356        list_t          *list   = &bmi_mx->bmx_done_q[id];
357        gen_mutex_t     *lock   = &bmi_mx->bmx_done_q_lock[id];
358        struct bmx_ctx  *ctx    = NULL;
359
360        BMX_ENTER;
361
362        gen_mutex_lock(lock);
363        if (!qlist_empty(list)) {
364                ctx = qlist_entry(list->next, struct bmx_ctx, mxc_list);
365                qlist_del_init(&ctx->mxc_list);
366        }
367        gen_mutex_unlock(lock);
368        *ctxp = ctx;
369
370        BMX_EXIT;
371        return;
372}
373
374static struct bmx_ctx *
375bmx_get_idle_rx(void)
376{
377        struct bmx_ctx  *rx     = NULL;
378        list_t          *l      = &bmi_mx->bmx_idle_rxs;
379
380        gen_mutex_lock(&bmi_mx->bmx_idle_rxs_lock);
381        if (!qlist_empty(l)) {
382                rx = qlist_entry(l->next, struct bmx_ctx, mxc_list);
383                qlist_del_init(&rx->mxc_list);
384
385                if (rx->mxc_get != rx->mxc_put) {
386                        debug(BMX_DB_ERR, "get_idle_rx() get (%llu) != put (%llu)",
387                                (unsigned long long) rx->mxc_get,
388                                (unsigned long long) rx->mxc_put);
389                        exit(1);
390                }
391                rx->mxc_get++;
392
393                if (!(rx->mxc_state == BMX_CTX_IDLE || rx->mxc_state == BMX_CTX_INIT)) {
394                        debug(BMX_DB_ERR, "get_idle_rx() rx state is %d",
395                                (int) rx->mxc_state);
396                        exit(1);
397                }
398                rx->mxc_state = BMX_CTX_PREP;
399        }
400        gen_mutex_unlock(&bmi_mx->bmx_idle_rxs_lock);
401
402        return rx;
403}
404
405static void
406bmx_put_idle_ctx(struct bmx_ctx *ctx)
407{
408        list_t          *list   = &bmi_mx->bmx_idle_txs;
409        gen_mutex_t     *lock   = &bmi_mx->bmx_idle_txs_lock;
410
411        if (ctx == NULL) {
412                debug(BMX_DB_WARN, "put_idle_ctx() called with NULL");
413                return;
414        }
415        ctx->mxc_put++;
416        if (ctx->mxc_get != ctx->mxc_put) {
417                debug(BMX_DB_ERR, "put_idle_ctx() get (%llu) != put (%llu)",
418                         (unsigned long long) ctx->mxc_get,
419                         (unsigned long long) ctx->mxc_put);
420                exit(1);
421        }
422        bmx_ctx_init(ctx);
423
424        if (ctx->mxc_type == BMX_REQ_RX) {
425                list   = &bmi_mx->bmx_idle_rxs;
426                lock   = &bmi_mx->bmx_idle_rxs_lock;
427        }
428
429        gen_mutex_lock(lock);
430        qlist_add(&ctx->mxc_list, list);
431        gen_mutex_unlock(lock);
432        return;
433}
434
435static void
436bmx_reduce_idle_rxs(int count)
437{
438        int              i      = 0;
439        struct bmx_ctx  *rx     = NULL;
440
441        for (i = 0; i < count; i++) {
442                rx = bmx_get_idle_rx();
443                if (rx != NULL) {
444                        bmx_ctx_free(rx);
445                }
446        }
447
448        return;
449}
450
451static struct bmx_ctx *
452bmx_get_idle_tx(void)
453{
454        struct bmx_ctx  *tx     = NULL;
455        list_t          *l      = &bmi_mx->bmx_idle_txs;
456
457        gen_mutex_lock(&bmi_mx->bmx_idle_txs_lock);
458        if (!qlist_empty(l)) {
459                tx = qlist_entry(l->next, struct bmx_ctx, mxc_list);
460                qlist_del_init(&tx->mxc_list);
461
462                if (tx->mxc_get != tx->mxc_put) {
463                        debug(BMX_DB_ERR, "get_idle_tx() get (%llu) != put (%llu)",
464                                (unsigned long long) tx->mxc_get,
465                                (unsigned long long) tx->mxc_put);
466                        exit(1);
467                }
468                tx->mxc_get++;
469
470                if (!(tx->mxc_state == BMX_CTX_IDLE || tx->mxc_state == BMX_CTX_INIT)) {
471                        debug(BMX_DB_ERR, "get_idle_tx() tx state is %d",
472                                (int) tx->mxc_state);
473                        exit(1);
474                }
475                tx->mxc_state = BMX_CTX_PREP;
476        }
477        gen_mutex_unlock(&bmi_mx->bmx_idle_txs_lock);
478
479        return tx;
480}
481
482/**** peername parsing functions **************************************/
483
484static int
485bmx_verify_hostname(char *host)
486{
487        int             ret     = 0;
488        int             len     = 0;
489        const char     *legal   = NULL;
490
491        legal = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-.";
492
493        len = strlen(host);
494        if (len > 64) {
495                debug(BMX_DB_INFO, "%s is not a legal hostname", host);
496        }
497
498        ret = strspn(host, legal);
499
500        if (len != ret || len > 63 || len == 0) {
501                return -1;
502        }
503
504        return 0;
505}
506
507static int
508bmx_verify_num_str(char *num_str)
509{
510        int             ret     = 0;
511        int             len     = 0;
512        const char     *legal   = "0123456789";
513
514        len = strlen(num_str);
515
516        ret = strspn(num_str, legal);
517
518        if (len != ret) {
519                return -1;
520        }
521
522        return 0;
523}
524
525
526/* parse mx://hostname:board:ep_id/filesystem/
527 * or    mx://hostname:ep_id/filesystem/
528 * this is pretty robust but if strtol() fails for board or ep_id, it
529 * returns 0 and we do not know that it failed.
530 * This handles legal hostnames (1-63 chars) include a-zA-Z0-9 as well as . and -
531 * It will accept IPv4 addresses but not IPv6 (too many semicolons) */
532static int
533bmx_parse_peername(const char *peername, char **hostname, uint32_t *board, uint32_t *ep_id)
534{
535        int             ret             = 0;
536        int             colon1_found    = 0;
537        int             colon2_found    = 0;
538        char           *s               = NULL;
539        char           *colon1          = NULL;
540        char           *colon2          = NULL;
541        char           *fs              = NULL;
542        char           *host            = NULL;
543        uint32_t        bd              = -1;
544        uint32_t        ep              = 0;
545
546        if (peername == NULL || hostname == NULL || board == NULL || ep_id == NULL) {
547                debug(BMX_DB_INFO, "parse_peername() called with invalid parameter");
548                return -BMI_EINVAL;
549        }
550
551        if (peername[0] != 'm' ||
552            peername[1] != 'x' ||
553            peername[2] != ':' ||
554            peername[3] != '/' ||
555            peername[4] != '/') {
556                debug(BMX_DB_INFO, "parse_peername() peername does not start with mx://");
557                return -1;
558        }
559
560        s = strdup(&peername[5]);
561        fs = strchr(s, '/');
562        if (fs) {
563                *fs = '\0';
564        }
565        colon1 = strchr(s, ':');
566        if (!colon1) {
567                debug(BMX_DB_INFO, "parse_peername() strchr() failed");
568        } else {
569                colon2 = strrchr(s, ':');
570                if (colon1 == colon2) {
571                        /* colon2_found == 0 */
572                        debug(BMX_DB_INFO, "parse_peername() MX hostname does not "
573                                           "include a board number");
574                } else {
575                        colon2_found = 1;
576                        *colon2 = '\0';
577                }
578                colon1_found = 1;
579                *colon1 = '\0';
580        }
581        /* if MX hostname includes board number...
582         * s      = hostname\0board\0ep_id\0filesystem
583         * colon1 =         \0board\0ep_id\0filesystem
584         * colon2 =                \0ep_id\0filesystem
585         * fs     =                       \0filesystem
586         *
587         * else if MX hostname does _not_ include a board number...
588         * s      = hostname\0ep_id\0filesystem
589         * colon1 =         \0ep_id\0filesystem
590         * colon2 =         \0ep_id\0filesystem
591         * fs     =                \0filesystem
592         */
593
594        colon1++;
595        colon2++;
596
597        /* make sure there are no more ':' in the strings */
598        if (colon1_found && colon2_found) {
599                if (NULL != strchr(colon1, ':') ||
600                    NULL != strchr(colon2, ':')) {
601                        debug(BMX_DB_INFO, "parse_peername() too many ':' (%s %s)",
602                                           colon1, colon2);
603                        free(s);
604                        return -1;
605                }
606        }
607
608        host = strdup(s);
609        if (!host) {
610                debug(BMX_DB_MEM, "parse_peername() malloc() failed");
611                free(s);
612                return -1;
613        }
614
615        if (colon1_found && colon2_found) {
616                bd = (uint32_t) strtol(colon1, NULL, 0);
617                ep = (uint32_t) strtol(colon2, NULL, 0);
618        } else if (colon1_found && !colon2_found) {
619                ep = (uint32_t) strtol(colon2, NULL, 0);
620        } else {
621                debug(BMX_DB_WARN, "%s is not a valid hostname", host);
622                free(host);
623                free(s);
624                return -1;
625        }
626
627        ret = bmx_verify_hostname(host);
628        if (ret != 0) {
629                debug(BMX_DB_INFO, "%s is not a valid hostname", host);
630                free(host);
631                free(s);
632                return -1;
633        }
634        ret = bmx_verify_num_str(colon1);
635        if (ret != 0) {
636                debug(BMX_DB_INFO, "%s is not a valid board ID", host);
637                free(host);
638                free(s);
639                return -1;
640        }
641        ret = bmx_verify_num_str(colon2);
642        if (ret != 0) {
643                debug(BMX_DB_INFO, "%s is not a valid endpoint ID", host);
644                free(host);
645                free(s);
646                return -1;
647        }
648
649        *hostname = host;
650        *board = bd;
651        *ep_id = ep;
652
653        free(s);
654
655        return 0;
656}
657
658/**** peer handling functions **************************************/
659
660static void
661bmx_peer_free(struct bmx_peer *peer)
662{
663        struct bmx_method_addr *mxmap = peer->mxp_mxmap;
664
665        if (mxmap != NULL) {
666                mxmap->mxm_peer = NULL;
667        }
668
669        if (!qlist_empty(&peer->mxp_queued_txs) ||
670            !qlist_empty(&peer->mxp_queued_rxs) ||
671            !qlist_empty(&peer->mxp_pending_rxs)) {
672                debug(BMX_DB_ERR, "freeing peer with non-empty lists");
673                exit(1);
674        }
675        gen_mutex_lock(&bmi_mx->bmx_peers_lock);
676        if (!qlist_empty(&peer->mxp_list)) qlist_del_init(&peer->mxp_list);
677        gen_mutex_unlock(&bmi_mx->bmx_peers_lock);
678        BMX_FREE(peer, sizeof(*peer));
679        return;
680}
681
682static void
683bmx_peer_addref(struct bmx_peer *peer)
684{
685        gen_mutex_lock(&peer->mxp_lock);
686        debug(BMX_DB_PEER, "%s refcount was %d", __func__, peer->mxp_refcount);
687        peer->mxp_refcount++;
688        gen_mutex_unlock(&peer->mxp_lock);
689        return;
690}
691
692static void
693bmx_peer_decref(struct bmx_peer *peer)
694{
695        BMX_ENTER;
696        gen_mutex_lock(&peer->mxp_lock);
697        if (peer->mxp_refcount == 0) {
698                debug(BMX_DB_WARN, "peer_decref() called for %s when refcount == 0",
699                                peer->mxp_mxmap->mxm_peername);
700        }
701        peer->mxp_refcount--;
702        if (peer->mxp_refcount == 1 && peer->mxp_state == BMX_PEER_DISCONNECT) {
703                /* all txs and rxs are completed or canceled, reset state */
704                debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_INIT",
705                                   peer->mxp_mxmap->mxm_peername);
706                peer->mxp_state = BMX_PEER_INIT;
707        }
708        gen_mutex_unlock(&peer->mxp_lock);
709
710        if (peer->mxp_refcount == 0) {
711                debug(BMX_DB_PEER, "%s freeing peer %s", __func__,
712                                peer->mxp_mxmap->mxm_peername);
713                struct bmx_method_addr *mxmap = peer->mxp_mxmap;
714
715                mx_set_endpoint_addr_context(peer->mxp_epa, NULL);
716                gen_mutex_lock(&bmi_mx->bmx_lock);
717                mxmap->mxm_peer = NULL;
718                gen_mutex_unlock(&bmi_mx->bmx_lock);
719                bmx_peer_free(peer);
720        }
721        BMX_EXIT;
722        return;
723}
724
725static int
726bmx_peer_alloc(struct bmx_peer **peerp, struct bmx_method_addr *mxmap)
727{
728        int              i              = 0;
729        int              ret            = 0;
730        char             name[MX_MAX_HOSTNAME_LEN + 1];
731        uint64_t         nic_id         = 0ULL;
732        mx_return_t      mxret          = MX_SUCCESS;
733        struct bmx_peer *peer           = NULL;
734
735        if (peerp == NULL) {
736                debug(BMX_DB_PEER, "peer_alloc() peerp = NULL");
737                return -1;
738        }
739        BMX_MALLOC(peer, sizeof(*peer));
740        if (!peer) {
741                debug(BMX_DB_MEM, "peer_alloc() unable to malloc peer");
742                return -BMI_ENOMEM;
743        }
744        peer->mxp_map   = mxmap->mxm_map;
745        peer->mxp_mxmap = mxmap;
746
747        /* init lists before calling in case we call peer_free() */
748        INIT_QLIST_HEAD(&peer->mxp_queued_txs);
749        INIT_QLIST_HEAD(&peer->mxp_queued_rxs);
750        INIT_QLIST_HEAD(&peer->mxp_pending_rxs);
751        INIT_QLIST_HEAD(&peer->mxp_list);
752       
753        memset(name, 0, sizeof(*name));
754        if (mxmap->mxm_board != -1) {
755                sprintf(name, "%s:%d", mxmap->mxm_hostname, mxmap->mxm_board);
756        } else {
757                sprintf(name, "%s", mxmap->mxm_hostname);
758        }
759        mxret = mx_hostname_to_nic_id(name, &nic_id);
760        if (mxret == MX_SUCCESS) {
761                peer->mxp_nic_id = nic_id;
762        } else {
763                debug((BMX_DB_MX|BMX_DB_WARN), "peer_alloc() unable to lookup nic_id "
764                      "for %s (mx_hostname_to_nic_id() returned %s)", name,
765                      mx_strerror(mxret));
766                bmx_peer_free(peer);
767                return -BMI_EHOSTUNREACH;
768        }
769        /* peer->mxp_epa will come from mx_iconnect() */
770
771        peer->mxp_state = BMX_PEER_INIT;
772        /* peer->mxp_tx_id assigned to me by peer */
773        gen_mutex_lock(&bmi_mx->bmx_lock);
774        peer->mxp_rx_id = bmi_mx->bmx_next_id++;
775        gen_mutex_unlock(&bmi_mx->bmx_lock);
776        if (bmi_mx->bmx_next_id > BMX_MAX_PEER_ID) {
777                /* FIXME we should reset to 1
778                 *      check if a new ID is already used
779                 *      but we have no idea when one is no longer used
780                 */
781                debug(BMX_DB_ERR, "peer id is rolling over. FATAL ERROR");
782                exit(1);
783        }
784
785        /* peer->mxp_refcount */
786
787        gen_mutex_init(&peer->mxp_lock);
788
789        for (i = 0; i < BMX_PEER_RX_NUM; i++) {
790                struct bmx_ctx  *rx     = NULL;
791
792                ret = bmx_ctx_alloc(&rx, BMX_REQ_RX);
793                if (ret != 0) {
794                        bmx_reduce_idle_rxs(i);
795                        bmx_peer_free(peer);
796                        return ret;
797                }
798                bmx_put_idle_ctx(rx);
799        }
800
801        /* on servers with server-to-server comms, we are racing
802         * between method_addr_lookup() and handle_conn_req() */
803
804        bmx_peer_addref(peer); /* for the peers list */
805        gen_mutex_lock(&bmi_mx->bmx_peers_lock);
806        qlist_add_tail(&peer->mxp_list, &bmi_mx->bmx_peers);
807        gen_mutex_unlock(&bmi_mx->bmx_peers_lock);
808
809        mxmap->mxm_peer = peer;
810        *peerp = peer;
811
812        return 0;
813}
814
815static int
816bmx_peer_init_state(struct bmx_peer *peer)
817{
818        int             ret     = 0;
819
820        BMX_ENTER;
821
822        gen_mutex_lock(&peer->mxp_lock);
823
824        /* we have a ref for each pending tx and rx, don't init
825         * if the refcount > 0 or pending_rxs is not empty */
826        if (!qlist_empty(&peer->mxp_pending_rxs) ||
827            peer->mxp_refcount != 0) {
828                ret = -1;
829        } else {
830                /* ok to init */
831                debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_INIT",
832                                   peer->mxp_mxmap->mxm_peername);
833                peer->mxp_state = BMX_PEER_INIT;
834        }
835
836        gen_mutex_unlock(&peer->mxp_lock);
837
838        BMX_EXIT;
839
840        return 0;
841}
842
843/**** startup/shutdown functions **************************************/
844
845/* init bmi_mx */
846static int
847bmx_globals_init(int method_id)
848{
849        int     i       = 0;
850
851#if BMX_MEM_ACCT
852        mem_used = 0;
853        gen_mutex_init(&mem_used_lock);
854#endif
855        BMX_MALLOC(bmi_mx, sizeof(*bmi_mx));
856        if (bmi_mx == NULL) {
857                return -1;
858        }
859
860        bmi_mx->bmx_method_id = method_id;
861
862        /* bmi_mx->bmx_peername */
863        /* bmi_mx->bmx_hostname */
864        /* bmi_mx->bmx_board */
865        /* bmi_mx->bmx_ep_id */
866        /* bmi_mx->bmx_ep */
867        /* bmi_mx->bmx_sid */
868        /* bmi_mx->bmx_is_server */
869
870        INIT_QLIST_HEAD(&bmi_mx->bmx_peers);
871        gen_mutex_init(&bmi_mx->bmx_peers_lock);
872
873        INIT_QLIST_HEAD(&bmi_mx->bmx_txs);
874        INIT_QLIST_HEAD(&bmi_mx->bmx_idle_txs);
875        gen_mutex_init(&bmi_mx->bmx_idle_txs_lock);
876
877        INIT_QLIST_HEAD(&bmi_mx->bmx_rxs);
878        INIT_QLIST_HEAD(&bmi_mx->bmx_idle_rxs);
879        gen_mutex_init(&bmi_mx->bmx_idle_rxs_lock);
880
881        gen_mutex_init(&bmi_mx->bmx_completion_lock);
882        /* set to 1 to allow testing to start */
883        bmi_mx->bmx_refcount = 1;
884
885        for (i = 0; i < BMI_MAX_CONTEXTS; i++) {
886                INIT_QLIST_HEAD(&bmi_mx->bmx_done_q[i]);
887                gen_mutex_init(&bmi_mx->bmx_done_q_lock[i]);
888        }
889
890        INIT_QLIST_HEAD(&bmi_mx->bmx_unex_rxs);
891        gen_mutex_init(&bmi_mx->bmx_unex_rxs_lock);
892
893        bmi_mx->bmx_next_id = 1;
894        gen_mutex_init(&bmi_mx->bmx_lock);      /* global lock, use for global txs,
895                                                   global rxs, next_id, etc. */
896
897#if BMX_MEM_TWEAK
898        INIT_QLIST_HEAD(&bmi_mx->bmx_idle_buffers);
899        gen_mutex_init(&bmi_mx->bmx_idle_buffers_lock);
900        INIT_QLIST_HEAD(&bmi_mx->bmx_used_buffers);
901        gen_mutex_init(&bmi_mx->bmx_used_buffers_lock);
902
903        INIT_QLIST_HEAD(&bmi_mx->bmx_idle_unex_buffers);
904        gen_mutex_init(&bmi_mx->bmx_idle_unex_buffers_lock);
905        INIT_QLIST_HEAD(&bmi_mx->bmx_used_unex_buffers);
906        gen_mutex_init(&bmi_mx->bmx_used_unex_buffers_lock);
907#endif
908        return 0;
909}
910
911
912static int
913bmx_open_endpoint(mx_endpoint_t *ep, uint32_t board, uint32_t ep_id)
914{
915        mx_return_t     mxret   = MX_SUCCESS;
916        mx_param_t      param;
917
918        /* This will tell MX to use context IDs. Normally, MX has one
919         * set of queues for posted recvs, unexpected, etc. This will
920         * create seaparate sets of queues for each msg type.
921         * The benefit is that we can call mx_test_any() for each
922         * message type and not have to scan a long list of non-
923         * matching recvs. */
924        param.key = MX_PARAM_CONTEXT_ID;
925        param.val.context_id.bits = 4;
926        param.val.context_id.shift = BMX_MSG_SHIFT;
927
928        mxret = mx_open_endpoint(board, ep_id, BMX_MAGIC,
929                                 &param, 1, ep);
930        if (mxret != MX_SUCCESS) {
931                return -1;
932        }
933
934        mxret = mx_register_unexp_handler(*ep, (mx_unexp_handler_t)
935                                          bmx_unexpected_recv, NULL);
936        if (mxret != MX_SUCCESS) {
937                debug(BMX_DB_WARN, "mx_register_unexp_callback() failed "
938                                "with %s", mx_strerror(mxret));
939                mx_close_endpoint(*ep);
940                mx_finalize(); 
941                return -1;
942        }
943
944        mxret = mx_set_request_timeout(*ep, NULL, BMX_TIMEOUT);
945        if (mxret != MX_SUCCESS) {
946                debug(BMX_DB_WARN, "mx_set_request_timeout() failed with %s",
947                                 mx_strerror(mxret));
948                mx_close_endpoint(*ep);
949                mx_finalize();
950                return -1;
951        }
952
953        return 0;
954}
955
956/* The listen_addr is our method if we are a server. It is NULL for a
957 * client. The other params are NULL/0 for the client as well. */
958static int
959BMI_mx_initialize(bmi_method_addr_p listen_addr, int method_id, int init_flags)
960{
961        int             i       = 0;
962        int             ret     = 0;
963        mx_return_t     mxret   = MX_SUCCESS;
964
965        BMX_ENTER;
966
967         /* check params */
968        if (!!listen_addr ^ (init_flags & BMI_INIT_SERVER)) {
969                debug(BMX_DB_ERR, "mx_initialize() with illegal parameters. "
970                        "BMI_INIT_SERVER requires non-null listen_addr");
971                exit(1);
972        }
973
974        ret = bmx_globals_init(method_id);
975        if (ret != 0) {
976                debug(BMX_DB_WARN, "bmx_globals_init() failed with no memory");
977                return -BMI_ENOMEM;
978        }
979
980        /* disable shmem to allow clients and servers on the same machine */
981        setenv("MX_DISABLE_SHMEM", "1", 1);
982
983        /* return errors, do not abort */
984        mx_set_error_handler(MX_ERRORS_RETURN);
985
986        /* only complete sends after they are delivered */
987        setenv("MX_ZOMBIE", "0", 1);
988
989        mxret = mx_init();
990        if (!(mxret == MX_SUCCESS || mxret == MX_ALREADY_INITIALIZED)) {
991                debug(BMX_DB_WARN, "mx_init() failed with %s", mx_strerror(mxret));
992                BMX_FREE(bmi_mx, sizeof(*bmi_mx));
993                return -BMI_ENODEV;
994        }
995
996        /* if we are a server, open an endpoint now. If a client, wait until first
997         * sendunexpected or first recv. */
998        if (init_flags & BMI_INIT_SERVER) {
999                struct bmx_ctx         *rx     = NULL;
1000                struct bmx_method_addr *mxmap  = listen_addr->method_data;
1001                mx_endpoint_addr_t      epa;
1002                uint32_t                ep_id   = 0;
1003                uint32_t                sid     = 0;
1004                uint64_t                nic_id  = 0ULL;
1005                struct bmx_peer         *peer   = NULL;
1006
1007                bmi_mx->bmx_hostname = strdup(mxmap->mxm_hostname);
1008                bmi_mx->bmx_board = mxmap->mxm_board;
1009                bmi_mx->bmx_ep_id = mxmap->mxm_ep_id;
1010                bmi_mx->bmx_is_server = 1;
1011                bmx_create_peername();
1012
1013                ret = bmx_open_endpoint(&bmi_mx->bmx_ep, mxmap->mxm_board, mxmap->mxm_ep_id);
1014                if (ret != 0) {
1015                        debug(BMX_DB_ERR, "open_endpoint() failed");
1016                        BMX_FREE(bmi_mx, sizeof(*bmi_mx));
1017                        exit(1);
1018                }
1019
1020                /* get our MX session id */
1021                mx_get_endpoint_addr(bmi_mx->bmx_ep, &epa);
1022                mx_decompose_endpoint_addr2(epa, &nic_id, &ep_id, &sid);
1023                bmi_mx->bmx_sid = sid;
1024
1025                bmx_peer_alloc(&peer, mxmap);
1026
1027                /* We allocate BMX_PEER_RX_NUM when we peer_alloc()
1028                 * Allocate some here to catch the peer CONN_REQ */
1029                for (i = 0; i < BMX_SERVER_RXS; i++) {
1030                        ret = bmx_ctx_alloc(&rx, BMX_REQ_RX);
1031                        if (ret == 0) {
1032                                bmx_put_idle_ctx(rx);
1033                        }
1034                }
1035        }
1036
1037#if BMX_MEM_TWEAK
1038        for (i = 0; i < BMX_BUFF_NUM; i++) {
1039                struct bmx_buffer *buf  = NULL;
1040                BMX_MALLOC(buf, sizeof(*buf));
1041                if (buf) {
1042                        INIT_QLIST_HEAD(&buf->mxb_list);
1043                        BMX_MALLOC(buf->mxb_buffer, BMX_BUFF_SIZE);
1044                        if (buf->mxb_buffer) {
1045                                qlist_add(&buf->mxb_list, &bmi_mx->bmx_idle_buffers);
1046                        } else {
1047                                BMX_FREE(buf, sizeof(*buf));
1048                        }
1049                }
1050        }
1051#endif
1052
1053#if BMX_MEM_ACCT
1054        debug(BMX_DB_MEM, "memory used at end of initialization %lld", llu(mem_used));
1055#endif
1056        BMX_EXIT;
1057
1058        return 0;
1059}
1060
1061static int
1062BMI_mx_finalize(void)
1063{
1064        struct bmx_data *tmp = bmi_mx;
1065
1066        BMX_ENTER;
1067
1068        gen_mutex_lock(&tmp->bmx_lock);
1069
1070        /* shutdown MX */
1071        mx_wakeup(bmi_mx->bmx_ep);
1072        mx_close_endpoint(bmi_mx->bmx_ep);
1073        mx_finalize();
1074
1075        /* free rxs */
1076        {
1077                struct bmx_ctx *rx   = NULL;
1078                struct bmx_ctx *next = NULL;
1079                qlist_for_each_entry_safe(rx, next, &bmi_mx->bmx_rxs, mxc_global_list) {
1080                        bmx_ctx_free(rx);
1081                }
1082        }
1083
1084        /* free txs */
1085        {
1086                struct bmx_ctx *tx   = NULL;
1087                struct bmx_ctx *next = NULL;
1088                qlist_for_each_entry_safe(tx, next, &bmi_mx->bmx_txs, mxc_global_list) {
1089                        bmx_ctx_free(tx);
1090                }
1091        }
1092
1093        /* free peers */
1094        {
1095                struct bmx_peer *peer   = NULL;
1096                struct bmx_peer *next   = NULL;
1097                qlist_for_each_entry_safe(peer, next, &bmi_mx->bmx_peers, mxp_list) {
1098                        bmx_peer_free(peer);
1099                }
1100        }
1101
1102#if BMX_MEM_TWEAK
1103        {
1104                list_t *idle = &bmi_mx->bmx_idle_buffers;
1105                list_t *used = &bmi_mx->bmx_used_buffers;
1106                struct bmx_buffer *mem  = NULL;
1107                struct bmx_buffer *next = NULL;
1108
1109                qlist_for_each_entry_safe(mem, next, idle, mxb_list) {
1110                        if (mem->mxb_used != 0)
1111                                debug(BMX_DB_MEM, "idle buffer used %d times",
1112                                                  mem->mxb_used);
1113                        BMX_FREE(mem->mxb_buffer, BMX_BUFF_SIZE);
1114                        BMX_FREE(mem, sizeof(*mem));
1115                }
1116                qlist_for_each_entry_safe(mem, next, used, mxb_list) {
1117                        if (mem->mxb_used != 0)
1118                                debug(BMX_DB_MEM, "used buffer used %d times",
1119                                                  mem->mxb_used);
1120                        BMX_FREE(mem->mxb_buffer, BMX_BUFF_SIZE);
1121                        BMX_FREE(mem, sizeof(*mem));
1122                }
1123                debug(BMX_DB_MEM, "%d misses", bmi_mx->bmx_misses);
1124        }
1125#endif
1126
1127        if (bmi_mx->bmx_hostname) {
1128                free(bmi_mx->bmx_hostname);
1129                bmi_mx->bmx_hostname = NULL;
1130        }
1131        if (bmi_mx->bmx_peername) {
1132                free(bmi_mx->bmx_peername);
1133                bmi_mx->bmx_peername = NULL;
1134        }
1135
1136        bmi_mx = NULL;
1137
1138        gen_mutex_unlock(&tmp->bmx_lock);
1139
1140        BMX_FREE(tmp, sizeof(*tmp));
1141
1142#if BMX_MEM_ACCT
1143        debug(BMX_DB_MEM, "memory leaked at shutdown %lld", llu(mem_used));
1144#endif
1145        BMX_EXIT;
1146        return 0;
1147}
1148
1149
1150/**** BMI_mx_* and support functions **************************************/
1151
1152/* bmx_peer_disconnect - close the connection to this peer
1153 * @peer - a bmx_peer pointer
1154 * @mx_dis - an integer, 0 or 1, where 1 means call mx_disconnect()
1155 *
1156 * This function sets peer state to DISCONNECT, sets all queued rxs and txs to
1157 * CANCELED and places them on the canceled list, cancels pending rxs and
1158 * optionally calls mx_disconnect() to cancel pending txs and matched rxs.
1159 */
1160static void
1161bmx_peer_disconnect(struct bmx_peer *peer, int mx_dis, bmi_error_code_t err)
1162{
1163        struct bmx_ctx  *tx     = NULL;
1164        struct bmx_ctx  *rx     = NULL;
1165        struct bmx_ctx  *next   = NULL;
1166
1167        debug(BMX_DB_CONN, "%s for %s in state %d (%d)", __func__,
1168                        peer->mxp_mxmap->mxm_peername, peer->mxp_state, mx_dis);
1169        gen_mutex_lock(&peer->mxp_lock);
1170        if (peer->mxp_state == BMX_PEER_DISCONNECT) {
1171                gen_mutex_unlock(&peer->mxp_lock);
1172                return;
1173        }
1174        debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_DISCONNECT",
1175                           peer->mxp_mxmap->mxm_peername);
1176        peer->mxp_state = BMX_PEER_DISCONNECT;
1177
1178        /* cancel queued txs */
1179        while (!qlist_empty(&peer->mxp_queued_txs)) {
1180                list_t          *queued_txs     = &peer->mxp_queued_txs;
1181                tx = qlist_entry(queued_txs->next, struct bmx_ctx, mxc_list);
1182                qlist_del_init(&tx->mxc_list);
1183                bmx_q_completed(tx, BMX_CTX_CANCELED, BMX_NO_STATUS, err);
1184        }
1185        /* cancel queued rxs */
1186        while (!qlist_empty(&peer->mxp_queued_rxs)) {
1187                list_t          *queued_rxs     = &peer->mxp_queued_rxs;
1188                rx = qlist_entry(queued_rxs->next, struct bmx_ctx, mxc_list);
1189                qlist_del_init(&rx->mxc_list);
1190                bmx_q_completed(rx, BMX_CTX_CANCELED, BMX_NO_STATUS, err);
1191        }
1192        /* try to cancel pending rxs */
1193        qlist_for_each_entry_safe(rx, next, &peer->mxp_pending_rxs, mxc_list) {
1194                uint32_t        result = 0;
1195                mx_cancel(bmi_mx->bmx_ep, &rx->mxc_mxreq, &result);
1196                if (result) {
1197                        qlist_del_init(&rx->mxc_list);
1198                        bmx_q_completed(rx, BMX_CTX_CANCELED, BMX_NO_STATUS, err);
1199                }
1200        }
1201        gen_mutex_unlock(&peer->mxp_lock);
1202        if (mx_dis) {
1203                /* cancel all posted txs and matched rxs */
1204                mx_disconnect(bmi_mx->bmx_ep, peer->mxp_epa);
1205        }
1206        return;
1207}
1208
1209static int
1210BMI_mx_set_info(int option, void *inout_parameter)
1211{
1212        struct bmi_method_addr      *map    = NULL;
1213        struct bmx_method_addr  *mxmap  = NULL;
1214        struct bmx_peer         *peer   = NULL;
1215
1216        BMX_ENTER;
1217
1218        switch(option) {
1219                case BMI_DROP_ADDR:
1220                        if (inout_parameter != NULL) {
1221                                map = (struct bmi_method_addr *) inout_parameter;
1222                                mxmap = map->method_data;
1223                                debug(BMX_DB_PEER, "%s drop %s map 0x%p mxmap 0x%p)",
1224                                                __func__, mxmap->mxm_peername != NULL ?
1225                                                mxmap->mxm_peername : "NULL", map,
1226                                                mxmap);
1227                                if (bmi_mx != NULL) {
1228                                        peer = mxmap->mxm_peer;
1229                                        bmx_peer_disconnect(peer, 1, BMI_ENETRESET);
1230                                }
1231                                if (mxmap->mxm_peername) {
1232                                        debug(BMX_DB_MEM, "freeing mxm_peername");
1233                                        free((void *) mxmap->mxm_peername);
1234                                        mxmap->mxm_peername = NULL;
1235                                }
1236                                if (mxmap->mxm_hostname) {
1237                                        debug(BMX_DB_MEM, "freeing mxm_hostname");
1238                                        free((void *) mxmap->mxm_hostname);
1239                                        mxmap->mxm_hostname = NULL;
1240                                }
1241                                debug(BMX_DB_PEER, "freeing map 0x%p", map);
1242                                free(map);
1243                        }
1244                break;
1245                default:
1246                        /* XXX: should return -ENOSYS, but 0 now until callers
1247                         * handle that correctly. */
1248                break;
1249        }
1250        BMX_EXIT;
1251
1252        return 0;
1253}
1254
1255static int
1256BMI_mx_get_info(int option, void *inout_parameter)
1257{
1258        int     ret     = 0;
1259
1260        BMX_ENTER;
1261
1262        switch(option) {
1263                case BMI_CHECK_MAXSIZE:
1264                        /* reality is 2^31, but shrink to avoid negative int */
1265                        *(int *)inout_parameter = (1U << 31) - 1;
1266                        break;
1267                case BMI_GET_UNEXP_SIZE:
1268                        *(int *)inout_parameter = BMX_UNEXPECTED_SIZE;
1269                        break;
1270                default:
1271                        ret = -BMI_ENOSYS;
1272        }
1273        BMX_EXIT;
1274
1275        return ret;
1276}
1277
1278#define BMX_IO_BUF      1
1279#define BMX_UNEX_BUF    2
1280
1281static void *
1282bmx_memalloc(bmi_size_t size, int type)
1283{
1284        void                    *buf    = NULL;
1285#if BMX_MEM_TWEAK
1286        int                     *misses = NULL;
1287        struct bmx_buffer       *mem    = NULL;
1288        list_t                  *idle   = NULL;
1289        list_t                  *used   = NULL;
1290        gen_mutex_t             *idle_lock  = NULL;
1291        gen_mutex_t             *used_lock  = NULL;
1292
1293        if (type == BMX_IO_BUF) {
1294                idle = &bmi_mx->bmx_idle_buffers;
1295                used = &bmi_mx->bmx_used_buffers;
1296                idle_lock = &bmi_mx->bmx_idle_buffers_lock;
1297                used_lock = &bmi_mx->bmx_used_buffers_lock;
1298                misses = &bmi_mx->bmx_misses;
1299        } else if (type == BMX_UNEX_BUF) {
1300                idle = &bmi_mx->bmx_idle_unex_buffers;
1301                used = &bmi_mx->bmx_used_unex_buffers;
1302                idle_lock = &bmi_mx->bmx_idle_unex_buffers_lock;
1303                used_lock = &bmi_mx->bmx_used_unex_buffers_lock;
1304                misses = &bmi_mx->bmx_unex_misses;
1305        } else {
1306                return NULL;
1307        }
1308
1309        gen_mutex_lock(idle_lock);
1310        if (size <= (BMX_BUFF_SIZE) && !qlist_empty(idle)) {
1311                mem = qlist_entry(idle->next, struct bmx_buffer, mxb_list);
1312                qlist_del_init(&mem->mxb_list);
1313                gen_mutex_unlock(idle_lock);
1314                buf = mem->mxb_buffer;
1315                mem->mxb_used++;
1316                gen_mutex_lock(used_lock);
1317                qlist_add(&mem->mxb_list, used);
1318                gen_mutex_unlock(used_lock);
1319                gen_mutex_lock(idle_lock);
1320        } else {
1321                (*misses)++;
1322                gen_mutex_unlock(idle_lock);
1323                buf = malloc((size_t) size);
1324                gen_mutex_lock(idle_lock);
1325        }
1326        gen_mutex_unlock(idle_lock);
1327#else
1328        buf = malloc((size_t) size);
1329#endif
1330        return buf;
1331}
1332
1333void *
1334BMI_mx_memalloc(bmi_size_t size, enum bmi_op_type send_recv)
1335{
1336        void                    *buf    = NULL;
1337        buf = bmx_memalloc(size, BMX_IO_BUF);
1338        return buf;
1339}
1340
1341static int
1342bmx_memfree(void *buffer, bmi_size_t size, int type)
1343{
1344#if BMX_MEM_TWEAK
1345        int                     found   = 0;
1346        struct bmx_buffer       *mem    = NULL;
1347        list_t                  *idle   = NULL;
1348        list_t                  *used   = NULL;
1349        gen_mutex_t             *idle_lock  = NULL;
1350        gen_mutex_t             *used_lock  = NULL;
1351
1352        if (type == BMX_IO_BUF) {
1353                idle = &bmi_mx->bmx_idle_buffers;
1354                used = &bmi_mx->bmx_used_buffers;
1355                idle_lock = &bmi_mx->bmx_idle_buffers_lock;
1356                used_lock = &bmi_mx->bmx_used_buffers_lock;
1357        } else if (type == BMX_UNEX_BUF) {
1358                idle = &bmi_mx->bmx_idle_unex_buffers;
1359                used = &bmi_mx->bmx_used_unex_buffers;
1360                idle_lock = &bmi_mx->bmx_idle_unex_buffers_lock;
1361                used_lock = &bmi_mx->bmx_used_unex_buffers_lock;
1362        } else {
1363                return -1;
1364        }
1365
1366        gen_mutex_lock(used_lock);
1367        qlist_for_each_entry(mem, used, mxb_list) {
1368                if (mem->mxb_buffer == buffer) {
1369                        found = 1;
1370                        qlist_del_init(&mem->mxb_list);
1371                        gen_mutex_unlock(used_lock);
1372                        gen_mutex_lock(idle_lock);
1373                        qlist_add(&mem->mxb_list, idle);
1374                        gen_mutex_unlock(idle_lock);
1375                        gen_mutex_lock(used_lock);
1376                        break;
1377                }
1378        }
1379        gen_mutex_unlock(used_lock);
1380
1381        if (found == 0) {
1382                free(buffer);
1383        }
1384#else
1385        free(buffer);
1386#endif
1387        return 0;
1388}
1389
1390static int
1391BMI_mx_memfree(void *buffer, bmi_size_t size, enum bmi_op_type send_recv)
1392{
1393        int     ret     = 0;
1394        ret = bmx_memfree(buffer, size, BMX_IO_BUF);
1395        return ret;
1396}
1397
1398static int
1399BMI_mx_unexpected_free(void *buf)
1400{
1401        int     ret     = 0;
1402
1403        BMX_ENTER;
1404
1405        ret = bmx_memfree(buf, BMX_UNEXPECTED_SIZE, BMX_UNEX_BUF);
1406
1407        BMX_EXIT;
1408
1409        return 0;
1410}
1411
1412static void
1413bmx_parse_match(uint64_t match, uint8_t *type, uint32_t *id, uint32_t *tag,
1414    uint8_t* class)
1415{
1416        *type   = (uint8_t)  (match >> BMX_MSG_SHIFT);
1417        *class  = (uint8_t)  (match >> BMX_CLASS_SHIFT);
1418        *id     = (uint32_t) ((match >> BMX_ID_SHIFT) & BMX_MAX_PEER_ID); /* 20 bits */
1419        *tag    = (uint32_t) (match & BMX_MAX_TAG); /* 32 bits */
1420        return;
1421}
1422
1423static void
1424bmx_create_match(struct bmx_ctx *ctx)
1425{
1426        int             connect = 0;
1427        uint64_t        type    = (uint64_t) ctx->mxc_msg_type;
1428        uint64_t        id      = 0ULL;
1429        uint64_t        class   = 0ULL;
1430        uint64_t        tag     = (uint64_t) ((uint32_t) ctx->mxc_tag);
1431
1432        if (ctx->mxc_msg_type == BMX_MSG_CONN_REQ ||
1433            ctx->mxc_msg_type == BMX_MSG_CONN_ACK) {
1434                connect = 1;
1435        }
1436
1437        if ((ctx->mxc_type == BMX_REQ_TX && connect == 0) ||
1438            (ctx->mxc_type == BMX_REQ_RX && connect == 1)) {
1439                id = (uint64_t) ctx->mxc_peer->mxp_tx_id;
1440        } else if ((ctx->mxc_type == BMX_REQ_TX && connect == 1) ||
1441                   (ctx->mxc_type == BMX_REQ_RX && connect == 0)) {
1442                id = (uint64_t) ctx->mxc_peer->mxp_rx_id;
1443        } else {
1444                debug(BMX_DB_INFO, "create_match() for %s called with "
1445                                "connect = %d", ctx->mxc_type == BMX_REQ_TX ?
1446                                "TX" : "RX", connect);
1447        }
1448       
1449        if ((id >> 20) != 0) {
1450                debug(BMX_DB_ERR, "invalid %s of %llu\n", ctx->mxc_type == BMX_REQ_TX ?
1451                        "mxp_tx_id" : "mxp_rx_id", (unsigned long long) id);
1452                exit(1);
1453        }
1454
1455        class += ctx->mxc_class;
1456        ctx->mxc_match = (type << BMX_MSG_SHIFT) | (id << BMX_ID_SHIFT) |
1457            tag | (class << BMX_CLASS_SHIFT);
1458
1459        return;
1460}
1461
1462static bmi_error_code_t
1463bmx_mx_to_bmi_errno(enum mx_status_code code)
1464{
1465        int     err     = 0;
1466
1467        switch (code) {
1468        case MX_STATUS_SUCCESS:
1469                err = 0;
1470                break;
1471        case MX_STATUS_TIMEOUT:
1472                err = BMI_ETIMEDOUT;
1473                break;
1474        case MX_STATUS_ENDPOINT_CLOSED:
1475        case MX_STATUS_BAD_SESSION:
1476        case MX_STATUS_BAD_KEY:
1477        case MX_STATUS_BAD_ENDPOINT:
1478                err = BMI_ECONNREFUSED;
1479                break;
1480        case MX_STATUS_ENDPOINT_UNREACHABLE:
1481                err = BMI_ENETRESET;
1482                break;
1483        case MX_STATUS_NO_RESOURCES:
1484        case MX_STATUS_EVENTQ_FULL:
1485                err = BMI_ENOMEM;
1486                break;
1487        default:
1488                debug(BMX_DB_WARN, "request status is %s", mx_strstatus(code));
1489                err = BMI_EIO;
1490                break;
1491        }
1492        return -err;
1493}
1494
1495/* if (peer->mxp_state == BMX_PEER_READY)
1496 *     add to pending list
1497 *     add refcount on peer
1498 *     mx_isend()
1499 * else
1500 *     add to peer's queued txs
1501 */
1502static int
1503bmx_post_tx(struct bmx_ctx *tx)
1504{
1505        int             ret     = 0;
1506        struct bmx_peer *peer   = tx->mxc_peer;
1507        mx_return_t     mxret   = MX_SUCCESS;
1508        mx_segment_t    *segs   = NULL;
1509
1510        debug((BMX_DB_FUNC|BMX_DB_CTX), "entering %s match= 0x%llx length= %lld "
1511                        "peer state= %d op_id= %llu", __func__, llu(tx->mxc_match),
1512                        lld(tx->mxc_nob), peer->mxp_state, llu(tx->mxc_mop->op_id));
1513        if (peer->mxp_state == BMX_PEER_READY) {
1514                bmx_q_pending_ctx(tx);  /* uses peer lock */
1515                if (tx->mxc_nseg == 1) {
1516                        segs = &tx->mxc_seg;
1517                } else {
1518                        segs = tx->mxc_seg_list;
1519                }
1520                mxret = mx_isend(bmi_mx->bmx_ep, segs, tx->mxc_nseg, peer->mxp_epa,
1521                                 tx->mxc_match, (void *) tx, &tx->mxc_mxreq);
1522                if (mxret != MX_SUCCESS) {
1523                        ret = -BMI_ENOMEM;
1524                        bmx_deq_pending_ctx(tx);        /* uses peer lock */
1525                        bmx_q_completed(tx, BMX_CTX_CANCELED, BMX_NO_STATUS, BMI_ENOMEM);
1526                }
1527        } else { /* peer is not ready */
1528                debug(BMX_DB_PEER, "%s peer is not ready (%d) q_ctx(tx) "
1529                                "match= 0x%llx length=%lld", __func__, peer->mxp_state,
1530                                llu(tx->mxc_match), lld(tx->mxc_nob));
1531                bmx_q_ctx(tx);  /* uses peer lock */
1532        }
1533        BMX_EXIT;
1534        return ret;
1535}
1536
1537static int
1538bmx_ensure_connected(struct bmx_method_addr *mxmap)
1539{
1540        int             ret     = 0;
1541        struct bmx_peer *peer   = mxmap->mxm_peer;
1542
1543        /* NOTE: can this happen? we call peer_alloc() when using
1544         * method_addr_lookup() */
1545        if (peer == NULL) {
1546                ret = bmx_peer_alloc(&peer, mxmap);
1547                if (ret != 0) {
1548                        debug((BMX_DB_CONN|BMX_DB_MEM), "%s could not allocate peer for %s",
1549                                        __func__, mxmap->mxm_peername);
1550                        goto out;
1551                }
1552        }
1553        if (peer->mxp_state == BMX_PEER_INIT) {
1554                debug(BMX_DB_CONN, "%s calling bmx_peer_connect() for %s",
1555                                        __func__, mxmap->mxm_peername);
1556                ret = bmx_peer_connect(peer);
1557        } else if (peer->mxp_state == BMX_PEER_DISCONNECT) {
1558                debug(BMX_DB_CONN, "%s %s is not connected", __func__, mxmap->mxm_peername);
1559                ret = -BMI_EHOSTDOWN;
1560        }
1561out:
1562        return ret;
1563}
1564
1565static int
1566bmx_post_send_common(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1567                     int numbufs, const void *const *buffers,
1568                     const bmi_size_t *sizes, bmi_size_t total_size,
1569                     bmi_msg_tag_t tag, void *user_ptr,
1570                     bmi_context_id context_id, uint8_t class,
1571                     int is_unexpected, PVFS_hint hints)
1572{
1573        struct bmx_ctx          *tx     = NULL;
1574        struct method_op        *mop    = NULL;
1575        struct bmx_method_addr  *mxmap  = NULL;
1576        struct bmx_peer         *peer   = NULL;
1577        int                      ret    = 0;
1578        PINT_event_id            eid    = 0;
1579
1580        PINT_EVENT_START(
1581            bmi_mx_send_event_id, bmi_mx_pid, NULL, &eid,
1582            PINT_HINT_GET_CLIENT_ID(hints),
1583            PINT_HINT_GET_REQUEST_ID(hints),
1584            PINT_HINT_GET_RANK(hints),
1585            PINT_HINT_GET_HANDLE(hints),
1586            PINT_HINT_GET_OP_ID(hints),
1587            total_size);
1588
1589        mxmap = remote_map->method_data;
1590
1591        ret = bmx_ensure_connected(mxmap);
1592        if (ret != 0) {
1593                goto out;
1594        }
1595        peer = mxmap->mxm_peer;
1596        bmx_peer_addref(peer); /* add ref and hold until test or testcontext */
1597
1598        /* get idle tx, if available, otherwise alloc one */
1599        tx = bmx_get_idle_tx();
1600        if (tx == NULL) {
1601                ret = bmx_ctx_alloc(&tx, BMX_REQ_TX);
1602                if (ret != 0) {
1603                        ret = -BMI_ENOMEM;
1604                        bmx_peer_decref(peer);
1605                        goto out;
1606                }
1607                tx->mxc_state = BMX_CTX_PREP;
1608        }
1609
1610        /* map buffer(s) */
1611        if (numbufs == 1) {
1612                tx->mxc_seg.segment_ptr = (void *) *buffers;
1613                tx->mxc_seg.segment_length = *sizes;
1614                tx->mxc_nob = *sizes;
1615        } else {
1616                int             i       = 0;
1617                mx_segment_t    *segs   = NULL;
1618
1619                BMX_MALLOC(segs, (numbufs * sizeof(*segs)));
1620                if (segs == NULL) {
1621                        bmx_put_idle_ctx(tx);
1622                        bmx_peer_decref(peer);
1623                        ret = -BMI_ENOMEM;
1624                        goto out;
1625                }
1626                tx->mxc_seg_list = segs;
1627                for (i = 0; i < numbufs; i++) {
1628                        segs[i].segment_ptr = (void *) buffers[i];
1629                        segs[i].segment_length = sizes[i];
1630                        tx->mxc_nob += sizes[i];
1631                }
1632        }
1633        tx->mxc_nseg = numbufs;
1634
1635        if (tx->mxc_nob != total_size) {
1636                debug(BMX_DB_INFO, "user provided total length %lld does not match "
1637                                "the buffer list total length %lld", lld(total_size),
1638                                lld(tx->mxc_nob));
1639        }
1640
1641        if (is_unexpected && tx->mxc_nob > (long long) BMX_UNEXPECTED_SIZE) {
1642                bmx_put_idle_ctx(tx);
1643                bmx_peer_decref(peer);
1644                ret = -BMI_EINVAL;
1645                goto out;
1646        }
1647
1648        tx->mxc_tag = tag;
1649        tx->mxc_peer = peer;
1650        tx->mxc_class = class;
1651        if (!is_unexpected) {
1652                tx->mxc_msg_type = BMX_MSG_EXPECTED;
1653        } else {
1654                tx->mxc_msg_type = BMX_MSG_UNEXPECTED;
1655        }
1656
1657        BMX_MALLOC(mop, sizeof(*mop));
1658        if (mop == NULL) {
1659                bmx_put_idle_ctx(tx);
1660                bmx_peer_decref(peer);
1661                ret = -BMI_ENOMEM;
1662                goto out;
1663        }
1664        id_gen_fast_register(&mop->op_id, mop);
1665        debug(BMX_DB_CTX, "TX id_gen_fast_register(%llu)", llu(mop->op_id));
1666        mop->addr = remote_map;  /* set of function pointers, essentially */
1667        mop->method_data = tx;
1668        mop->user_ptr = user_ptr;
1669        mop->context_id = context_id;
1670        mop->event_id = eid;
1671        *id = mop->op_id;
1672        tx->mxc_mop = mop;
1673
1674        assert(context_id == mop->context_id);
1675        assert(context_id == tx->mxc_mop->context_id);
1676
1677        bmx_create_match(tx);
1678
1679        debug(BMX_DB_CTX, "%s tag= %d length= %d %s op_id= %llu context_id= %lld",
1680                        __func__, tag, (int) total_size,
1681                        is_unexpected ? "UNEXPECTED" : "EXPECTED",
1682                        llu(mop->op_id), lld(context_id));
1683
1684        ret = bmx_post_tx(tx);
1685
1686out:
1687        return ret;
1688}
1689
1690static int
1691BMI_mx_post_send_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1692                      const void *const *buffers, const bmi_size_t *sizes, int list_count,
1693                      bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
1694                      bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id,
1695                      PVFS_hint hints)
1696{
1697        int ret = 0;
1698
1699        BMX_ENTER;
1700
1701        ret = bmx_post_send_common(id, remote_map, list_count, buffers, sizes,
1702                                    total_size, tag, user_ptr, context_id,
1703                                    0, 0, hints);
1704
1705        BMX_EXIT;
1706
1707        return ret;
1708}
1709
1710static int
1711BMI_mx_post_sendunexpected_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1712                  const void *const *buffers, const bmi_size_t *sizes, int list_count,
1713                  bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
1714                  bmi_msg_tag_t tag, uint8_t class, void *user_ptr, bmi_context_id context_id,
1715                  PVFS_hint hints)
1716{
1717        int ret = 0;
1718
1719        BMX_ENTER;
1720
1721        return bmx_post_send_common(id, remote_map, list_count, buffers, sizes,
1722                                    total_size, tag, user_ptr, context_id,
1723                                    class, 1, hints);
1724
1725        BMX_EXIT;
1726
1727        return ret;
1728}
1729
1730/* if (peer->mxp_state == BMX_PEER_READY)
1731 *     add to pending list
1732 *     add refcount on peer
1733 *     mx_irecv()
1734 * else
1735 *     add to peer's queued rxs
1736 */
1737static int
1738bmx_post_rx(struct bmx_ctx *rx)
1739{
1740        int             ret     = 0;
1741        struct bmx_peer *peer   = rx->mxc_peer;
1742        mx_return_t     mxret   = MX_SUCCESS;
1743        mx_segment_t    *segs   = NULL;
1744
1745        debug((BMX_DB_FUNC|BMX_DB_CTX), "entering %s match= 0x%llx length= %lld "
1746                        "peer state= %d op_id= %llu", __func__, llu(rx->mxc_match),
1747                        lld(rx->mxc_nob), peer->mxp_state, llu(rx->mxc_mop->op_id));
1748        if (peer->mxp_state == BMX_PEER_READY) {
1749                bmx_q_pending_ctx(rx);  /* uses peer lock */
1750                if (rx->mxc_nseg == 1) {
1751                        segs = &rx->mxc_seg;
1752                } else {
1753                        segs = rx->mxc_seg_list;
1754                }
1755                mxret = mx_irecv(bmi_mx->bmx_ep, segs, rx->mxc_nseg,
1756                                 rx->mxc_match, BMX_MASK_ALL, (void *) rx, &rx->mxc_mxreq);
1757                if (mxret != MX_SUCCESS) {
1758                        ret = -BMI_ENOMEM;
1759                        bmx_deq_pending_ctx(rx);        /* uses peer lock */
1760                        bmx_q_completed(rx, BMX_CTX_CANCELED, BMX_NO_STATUS, BMI_ENOMEM);
1761                }
1762        } else { /* peer is not ready */
1763                debug(BMX_DB_PEER, "%s peer is not ready (%d) q_ctx(rx) match= 0x%llx "
1764                                "length=%lld", __func__, peer->mxp_state,
1765                                llu(rx->mxc_match), (long long) rx->mxc_nob);
1766                bmx_q_ctx(rx);  /* uses peer lock */
1767        }
1768        BMX_EXIT;
1769        return ret;
1770}
1771
1772static int
1773bmx_post_recv_common(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1774                     int numbufs, void *const *buffers, const bmi_size_t *sizes,
1775                     bmi_size_t tot_expected_len, bmi_msg_tag_t tag,
1776                     void *user_ptr, bmi_context_id context_id,
1777                     PVFS_hint hints)
1778{
1779        int                      ret    = 0;
1780        struct bmx_ctx          *rx     = NULL;
1781        struct method_op        *mop    = NULL;
1782        struct bmx_method_addr  *mxmap  = NULL;
1783        struct bmx_peer         *peer   = NULL;
1784        PINT_event_id            eid    = 0;
1785
1786        PINT_EVENT_START(
1787            bmi_mx_recv_event_id, bmi_mx_pid, NULL, &eid,
1788            PINT_HINT_GET_CLIENT_ID(hints),
1789            PINT_HINT_GET_REQUEST_ID(hints),
1790            PINT_HINT_GET_RANK(hints),
1791            PINT_HINT_GET_HANDLE(hints),
1792            PINT_HINT_GET_OP_ID(hints),
1793            tot_expected_len);
1794
1795        mxmap = remote_map->method_data;
1796
1797        ret = bmx_ensure_connected(mxmap);
1798        if (ret != 0) {
1799                goto out;
1800        }
1801        peer = mxmap->mxm_peer;
1802        bmx_peer_addref(peer); /* add ref and hold until test or testcontext */
1803
1804        /* get idle tx, if available, otherwise alloc one */
1805        rx = bmx_get_idle_rx();
1806        if (rx == NULL) {
1807                ret = bmx_ctx_alloc(&rx, BMX_REQ_RX);
1808                if (ret != 0) {
1809                        bmx_peer_decref(peer);
1810                        goto out;
1811                }
1812                rx->mxc_state = BMX_CTX_PREP;
1813        }
1814        rx->mxc_tag = tag;
1815        rx->mxc_msg_type = BMX_MSG_EXPECTED;
1816        rx->mxc_peer = peer;
1817
1818        /* map buffer(s) */
1819        if (numbufs == 1) {
1820                rx->mxc_seg.segment_ptr = (char *) *buffers;
1821                rx->mxc_seg.segment_length = *sizes;
1822                rx->mxc_nob = *sizes;
1823        } else {
1824                int             i       = 0;
1825                mx_segment_t    *segs   = NULL;
1826
1827                BMX_MALLOC(segs, (numbufs * sizeof(*segs)));
1828                if (segs == NULL) {
1829                        bmx_put_idle_ctx(rx);
1830                        bmx_peer_decref(peer);
1831                        ret = -BMI_ENOMEM;
1832                        goto out;
1833                }
1834                rx->mxc_seg_list = segs;
1835                for (i = 0; i < numbufs; i++) {
1836                        segs[i].segment_ptr = (void *) buffers[i];
1837                        segs[i].segment_length = sizes[i];
1838                        rx->mxc_nob += sizes[i];
1839                }
1840        }
1841        rx->mxc_nseg = numbufs;
1842
1843        if (rx->mxc_nob != tot_expected_len) {
1844                debug(BMX_DB_INFO, "user provided total length %d does not match "
1845                                "the buffer list total length %lld",
1846                                (uint32_t) tot_expected_len, (long long) rx->mxc_nob);
1847        }
1848
1849        BMX_MALLOC(mop, sizeof(*mop));
1850        if (mop == NULL) {
1851                bmx_put_idle_ctx(rx);
1852                bmx_peer_decref(peer);
1853                ret = -BMI_ENOMEM;
1854                goto out;
1855        }
1856        id_gen_fast_register(&mop->op_id, mop);
1857        debug(BMX_DB_CTX, "RX id_gen_fast_register(%llu)", llu(mop->op_id));
1858        mop->addr = remote_map;  /* set of function pointers, essentially */
1859        mop->method_data = rx;
1860        mop->user_ptr = user_ptr;
1861        mop->context_id = context_id;
1862        mop->event_id = eid;
1863        *id = mop->op_id;
1864        rx->mxc_mop = mop;
1865
1866        bmx_create_match(rx);
1867
1868        debug(BMX_DB_CTX, "%s tag= %d length= %d op_id= %llu", __func__,
1869                          tag, (int) tot_expected_len, llu(mop->op_id));
1870
1871        ret = bmx_post_rx(rx);
1872out:
1873        return ret;
1874}
1875
1876static int
1877BMI_mx_post_recv_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1878               void *const *buffers, const bmi_size_t *sizes, int list_count,
1879               bmi_size_t tot_expected_len, bmi_size_t *tot_actual_len __unused,
1880               enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
1881               bmi_context_id context_id,
1882               PVFS_hint hints)
1883{
1884        int ret = 0;
1885
1886        BMX_ENTER;
1887
1888        ret = bmx_post_recv_common(id, remote_map, list_count, buffers, sizes,
1889                                    tot_expected_len, tag, user_ptr, context_id,
1890                                    hints);
1891
1892        BMX_EXIT;
1893
1894        return ret;
1895}
1896
1897static void
1898bmx_peer_post_queued_rxs(struct bmx_peer *peer)
1899{
1900        struct bmx_ctx  *rx             = NULL;
1901        list_t          *queued_rxs     = &peer->mxp_queued_rxs;
1902
1903        BMX_ENTER;
1904
1905        gen_mutex_lock(&peer->mxp_lock);
1906        while (!qlist_empty(queued_rxs)) {
1907                if (peer->mxp_state != BMX_PEER_READY) {
1908                        gen_mutex_unlock(&peer->mxp_lock);
1909                        return;
1910                }
1911                rx = qlist_entry(queued_rxs->next, struct bmx_ctx, mxc_list);
1912                qlist_del_init(&rx->mxc_list);
1913                gen_mutex_unlock(&peer->mxp_lock);
1914                bmx_post_rx(rx);
1915                gen_mutex_lock(&peer->mxp_lock);
1916        }
1917        gen_mutex_unlock(&peer->mxp_lock);
1918
1919        BMX_EXIT;
1920
1921        return;
1922}
1923
1924static void
1925bmx_peer_post_queued_txs(struct bmx_peer *peer)
1926{
1927        struct bmx_ctx  *tx             = NULL;
1928        list_t          *queued_txs     = &peer->mxp_queued_txs;
1929
1930        BMX_ENTER;
1931
1932        gen_mutex_lock(&peer->mxp_lock);
1933        while (!qlist_empty(queued_txs)) {
1934                if (peer->mxp_state != BMX_PEER_READY) {
1935                        gen_mutex_unlock(&peer->mxp_lock);
1936                        return;
1937                }
1938                tx = qlist_entry(queued_txs->next, struct bmx_ctx, mxc_list);
1939                qlist_del_init(&tx->mxc_list);
1940                gen_mutex_unlock(&peer->mxp_lock);
1941                /* we may have posted this before we got the peer's id */
1942                bmx_create_match(tx);
1943                bmx_post_tx(tx);
1944                gen_mutex_lock(&peer->mxp_lock);
1945        }
1946        gen_mutex_unlock(&peer->mxp_lock);
1947
1948        BMX_EXIT;
1949
1950        return;
1951}
1952
1953
1954static int
1955bmx_post_unexpected_recv(mx_endpoint_addr_t source, uint8_t type, uint32_t id,
1956                         uint32_t tag, uint64_t match, uint32_t length)
1957{
1958        int             ret     = 0;
1959        struct bmx_ctx  *rx     = NULL;
1960        struct bmx_peer *peer   = NULL;
1961        void            *peerp  = (void *) &peer;
1962        mx_return_t     mxret   = MX_SUCCESS;
1963        uint8_t         class = 0;
1964
1965        BMX_ENTER;
1966
1967        if (id == 0 && tag == 0 && type == 0) {
1968                bmx_parse_match(match, &type, &id, &tag, &class);
1969        }
1970
1971        rx = bmx_get_idle_rx();
1972        if (rx != NULL) {
1973                mx_get_endpoint_addr_context(source, &peerp);
1974                peer = (struct bmx_peer *) peerp;
1975                if (peer == NULL) {
1976                        debug(BMX_DB_PEER, "unknown peer sent message 0x%llx "
1977                                        "length %u", llu(match), length);
1978                }
1979                bmx_peer_addref(peer); /* can peer be NULL? */
1980                rx->mxc_peer = peer;
1981                rx->mxc_msg_type = type;
1982                rx->mxc_tag = tag;
1983                rx->mxc_match = match;
1984                rx->mxc_seg.segment_ptr = rx->mxc_buffer;
1985                rx->mxc_seg.segment_length = length;
1986                rx->mxc_nseg = 1;
1987                rx->mxc_nob = (long long) length;
1988
1989                if (length > BMX_UNEXPECTED_SIZE) {
1990                        debug(BMX_DB_WARN, "receiving unexpected msg with "
1991                                        "%d bytes. Will receive with length 0.",
1992                                        length);
1993                        rx->mxc_seg.segment_length = 0;
1994                }
1995                debug(BMX_DB_CTX, "%s rx match= 0x%llx length= %lld", __func__,
1996                                llu(rx->mxc_match), lld(rx->mxc_nob));
1997                mxret = mx_irecv(bmi_mx->bmx_ep, &rx->mxc_seg, rx->mxc_nseg,
1998                                 rx->mxc_match, BMX_MASK_ALL, (void *) rx, &rx->mxc_mxreq);
1999                if (mxret != MX_SUCCESS) {
2000                        debug((BMX_DB_MX|BMX_DB_CTX), "mx_irecv() failed with %s for an "
2001                                        "unexpected recv with tag %d length %d",
2002                                        mx_strerror(mxret), tag, length);
2003                        bmx_put_idle_ctx(rx);
2004                        ret = -1;
2005                }
2006        } else {
2007                ret = -1;
2008        }
2009
2010        BMX_EXIT;
2011
2012        return ret;
2013}
2014
2015/* MX calls this function if an incoming msg does not match a posted recv.
2016 * MX blocks while in this function. Make it as fast as possible -
2017 * do not allocate memory, etc.
2018 *
2019 * This function is also a nice way of finding early expected receives
2020 * before they are posted by PVFS/BMI.
2021 */
2022mx_unexp_handler_action_t
2023bmx_unexpected_recv(void *context, mx_endpoint_addr_t source,
2024                      uint64_t match_value, uint32_t length, void *data_if_available)
2025{
2026        int                     ret     = MX_RECV_CONTINUE;
2027        struct bmx_ctx          *rx     = NULL;
2028        uint8_t                 type    = 0;
2029        uint8_t                 class   = 0;
2030        uint32_t                id      = 0;
2031        uint32_t                tag     = 0;
2032        struct bmx_peer         *peer   = NULL;
2033        void                    *peerp  = &peer;
2034        mx_return_t             mxret   = MX_SUCCESS;
2035
2036        bmx_parse_match(match_value, &type, &id, &tag, &class);
2037
2038        switch (type) {
2039        case BMX_MSG_CONN_REQ:
2040                debug(BMX_DB_CONN, "CONN_REQ from %s", (char *) data_if_available);
2041                if (!bmi_mx->bmx_is_server) {
2042                        debug(BMX_DB_ERR, "client receiving CONN_REQ");
2043                        exit(1);
2044                }
2045                /* a client is trying to contact us */
2046                /* do not alloc peer which can block, post rx only */
2047                rx = bmx_get_idle_rx();
2048                if (rx != NULL) {
2049                        rx->mxc_msg_type = type;
2050                        rx->mxc_tag = tag; /* this is the bmi_mx version number */
2051                        rx->mxc_match = match_value;
2052                        rx->mxc_seg.segment_length = length;
2053                        rx->mxc_nseg = 1;
2054                        rx->mxc_nob = (long long) length;
2055                        debug(BMX_DB_CONN, "%s rx match= 0x%llx length= %lld",
2056                                        __func__, llu(rx->mxc_match), lld(rx->mxc_nob));
2057                        mxret = mx_irecv(bmi_mx->bmx_ep, &rx->mxc_seg, rx->mxc_nseg,
2058                                         rx->mxc_match, BMX_MASK_ALL, (void *) rx, &rx->mxc_mxreq);
2059                        if (mxret != MX_SUCCESS) {
2060                                debug(BMX_DB_CONN, "mx_irecv() failed for an "
2061                                                "unexpected recv with %s",
2062                                                mx_strerror(mxret));
2063                                bmx_put_idle_ctx(rx);
2064                                ret = MX_RECV_FINISHED;
2065                        }
2066                } else {
2067                        ret = MX_RECV_FINISHED;
2068                }
2069                break;
2070        case BMX_MSG_CONN_ACK:
2071                /* the server is replying to our CONN_REQ */
2072                mx_get_endpoint_addr_context(source, &peerp);
2073                peer = (struct bmx_peer *) peerp;
2074                if (peer == NULL) {
2075                        debug((BMX_DB_CONN|BMX_DB_PEER), "receiving CONN_ACK but "
2076                                        "the endpoint context does not have a peer");
2077                } else {
2078                        debug(BMX_DB_CONN, "CONN_ACK from %s id= %d",
2079                                        peer->mxp_mxmap->mxm_peername, id);
2080                        if (tag == BMX_VERSION) {
2081                                debug(BMX_DB_CONN, "setting %s's state to READY",
2082                                                 peer->mxp_mxmap->mxm_peername);
2083                                gen_mutex_lock(&peer->mxp_lock);
2084                                peer->mxp_tx_id = id;
2085                                peer->mxp_state = BMX_PEER_READY;
2086                                gen_mutex_unlock(&peer->mxp_lock);
2087                                bmx_peer_post_queued_rxs(peer);
2088                                bmx_peer_post_queued_txs(peer);
2089                        } else {
2090                                bmx_peer_disconnect(peer, 1, BMI_EPROTO);
2091                        }
2092                }
2093                /* we are done with the recv, drop it */
2094                ret = MX_RECV_FINISHED;
2095                break;
2096        case BMX_MSG_UNEXPECTED:
2097                if (!bmi_mx->bmx_is_server) {
2098                        void *peerp = &peer;
2099                        mx_get_endpoint_addr_context(source, &peerp);
2100                        peer = (struct bmx_peer *) peerp;
2101                        debug(BMX_DB_ERR, "client receiving unexpected message "
2102                                "from %s with mask 0x%llx length %u",
2103                                peer == NULL ? "unknown" : peer->mxp_mxmap->mxm_peername,
2104                                llu(match_value), length);
2105                        exit(1);
2106                }
2107                ret = bmx_post_unexpected_recv(source, type, id, tag, match_value, length);
2108                if (ret != 0) {
2109                        /* we will catch this later in testunexpected() */
2110                        debug(BMX_DB_CTX, "Missed unexpected receive");
2111                }
2112                ret = MX_RECV_CONTINUE;
2113                break;
2114        case BMX_MSG_EXPECTED:
2115                /* do nothing, BMI will post a recv for it */
2116                debug(BMX_DB_CTX, "Early expected message  length %u  tag %u  match "
2117                                "0x%llx", length, tag, llu(match_value));
2118                break;
2119        default:
2120                debug(BMX_DB_ERR, "received unexpected msg with type %d", type);
2121                exit(1);
2122                break;
2123        }
2124
2125        return ret;
2126}
2127
2128/* This is called before BMI_mx_initialize() on servers, do not use anything from bmx_data */
2129static struct bmi_method_addr *
2130bmx_alloc_method_addr(const char *peername, const char *hostname, uint32_t board, uint32_t ep_id)
2131{
2132        struct bmi_method_addr      *map            = NULL;
2133        struct bmx_method_addr  *mxmap          = NULL;
2134
2135        BMX_ENTER;
2136
2137        if (bmi_mx == NULL) {
2138                map = bmi_alloc_method_addr(
2139                    tmp_id, (bmi_size_t) sizeof(*mxmap));
2140        } else {
2141                map = bmi_alloc_method_addr(bmi_mx->bmx_method_id, (bmi_size_t) sizeof(*mxmap));
2142        }
2143        if (map == NULL) return NULL;
2144
2145        mxmap = map->method_data;
2146        mxmap->mxm_map = map;
2147        mxmap->mxm_peername = strdup(peername);
2148        mxmap->mxm_hostname = strdup(hostname);
2149        mxmap->mxm_board = board;
2150        mxmap->mxm_ep_id = ep_id;
2151        /* mxmap->mxm_peer */
2152
2153        BMX_EXIT;
2154
2155        return map;
2156}
2157
2158
2159/* test for ICON_REQ messages (on the client)
2160 * if found
2161 *    get idle tx
2162 *    marshall CONN_REQ
2163 *    set peer state to WAIT
2164 *    send CONN_REQ
2165 */
2166static void
2167bmx_handle_icon_req(void)
2168{
2169        uint32_t        result  = 0;
2170
2171        do {
2172                uint64_t        match   = (uint64_t) BMX_MSG_ICON_REQ << BMX_MSG_SHIFT;
2173                uint64_t        mask    = BMX_MASK_MSG;
2174                mx_status_t     status;
2175
2176                mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
2177                if (result) {
2178                        int                     length  = 0;
2179                        struct bmx_ctx          *tx     = NULL;
2180                        struct bmx_peer         *peer   = NULL;
2181                        struct bmx_method_addr  *mxmap  = NULL;
2182
2183                        peer = (struct bmx_peer *) status.context;
2184                        mxmap = peer->mxp_mxmap;
2185                        debug(BMX_DB_CONN, "%s returned for %s with %s", __func__,
2186                                        mxmap->mxm_peername, mx_strstatus(status.code));
2187
2188                        if (status.code != MX_STATUS_SUCCESS) {
2189                                debug((BMX_DB_CONN|BMX_DB_PEER),
2190                                      "%s: connect to %s failed with %s", __func__,
2191                                      mxmap->mxm_peername, mx_strstatus(status.code));
2192                                bmx_peer_disconnect(peer, 0, bmx_mx_to_bmi_errno(status.code));
2193                                /* drop ref taken before calling mx_iconnect */
2194                                bmx_peer_decref(peer);
2195                                continue;
2196                        }
2197
2198                        gen_mutex_lock(&peer->mxp_lock);
2199                        peer->mxp_epa = status.source;
2200                        gen_mutex_unlock(&peer->mxp_lock);
2201                        mx_set_endpoint_addr_context(peer->mxp_epa, (void *) peer);
2202
2203                        tx = bmx_get_idle_tx();
2204                        if (tx == NULL) {
2205                                int     ret     = 0;
2206                                ret = bmx_ctx_alloc(&tx, BMX_REQ_TX);
2207                                if (ret != 0) {
2208                                        bmx_peer_disconnect(peer, 1, BMI_ENOMEM);
2209                                        /* drop ref taken before calling mx_iconnect */
2210                                        bmx_peer_decref(peer);
2211                                        continue;
2212                                }
2213                                tx->mxc_state = BMX_CTX_PREP;
2214                        }
2215                        tx->mxc_msg_type = BMX_MSG_CONN_REQ;
2216                        /* tx->mxc_mop unused */
2217                        tx->mxc_peer = peer;
2218                        tx->mxc_tag = BMX_VERSION;
2219                        bmx_create_match(tx);
2220                        length = strlen(bmi_mx->bmx_peername) + 1; /* pad for '\0' */
2221                        BMX_MALLOC(tx->mxc_buffer, length);
2222                        if (tx->mxc_buffer == NULL) {
2223                                bmx_peer_disconnect(peer, 1, BMI_ENOMEM);
2224                                /* drop ref taken before calling mx_iconnect */
2225                                bmx_peer_decref(peer);
2226                                continue;
2227                        }
2228                        snprintf(tx->mxc_buffer, length, "%s", bmi_mx->bmx_peername);
2229                        tx->mxc_seg.segment_ptr = tx->mxc_buffer;
2230                        tx->mxc_seg.segment_length = length;
2231                        tx->mxc_nseg = 1;
2232                        tx->mxc_state = BMX_CTX_PENDING;
2233                        debug(BMX_DB_CONN, "%s tx match= 0x%llx length= %lld", __func__,
2234                                        llu(tx->mxc_match), lld(tx->mxc_nob));
2235                        mx_isend(bmi_mx->bmx_ep, &tx->mxc_seg, tx->mxc_nseg, peer->mxp_epa,
2236                                 tx->mxc_match, (void *) tx, &tx->mxc_mxreq);
2237                }
2238        } while (result);
2239
2240        return;
2241}
2242
2243/* test for received CONN_REQ messages (on the server)
2244 * if found
2245 *    create peer
2246 *    create mxmap
2247 *    mx_iconnect() w/BMX_MSG_ICON_ACK
2248 */
2249static void
2250bmx_handle_conn_req(void)
2251{
2252        uint32_t        result  = 0;
2253        uint64_t        match   = (uint64_t) BMX_MSG_CONN_REQ << BMX_MSG_SHIFT;
2254        uint64_t        mask    = BMX_MASK_MSG;
2255        uint64_t        ack     = (uint64_t) BMX_MSG_ICON_ACK << BMX_MSG_SHIFT;
2256        mx_status_t     status;
2257
2258        do {
2259                mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
2260                if (result) {
2261                        uint8_t                 type    = 0;
2262                        uint8_t                 class   = 0;
2263                        uint32_t                id      = 0;
2264                        uint32_t                sid     = 0;
2265                        uint32_t                version = 0;
2266                        uint64_t                nic_id  = 0ULL;
2267                        uint32_t                ep_id   = 0;
2268                        mx_request_t            request;
2269                        struct bmx_ctx          *rx     = NULL;
2270                        struct bmx_peer         *peer   = NULL;
2271                        struct bmx_method_addr  *mxmap  = NULL;
2272
2273                        rx = (struct bmx_ctx *) status.context;
2274                        debug(BMX_DB_CONN, "%s returned %s match 0x%llx with %s", __func__,
2275                                        rx->mxc_type == BMX_REQ_TX ? "TX" : "RX",
2276                                        llu(rx->mxc_match), mx_strstatus(status.code));
2277                        if (rx->mxc_type == BMX_REQ_TX) {
2278                                /* ignore the client's completion of the CONN_REQ send */
2279                                struct bmx_ctx *tx = rx;
2280                                debug(BMX_DB_CONN, "CONN_REQ sent to %s",
2281                                                tx->mxc_peer->mxp_mxmap->mxm_peername);
2282                                /* drop ref taken before mx_iconnect() */
2283                                bmx_peer_decref(tx->mxc_peer);
2284                                bmx_put_idle_ctx(tx);
2285                                continue;
2286                        } else if (status.code != MX_STATUS_SUCCESS) {
2287                                bmx_peer_decref(rx->mxc_peer);
2288                                bmx_put_idle_ctx(rx);
2289                                continue;
2290                        }
2291                        bmx_parse_match(rx->mxc_match, &type, &id, &version,
2292                            &class);
2293                        if (version != BMX_VERSION) {
2294                                /* TODO send error conn_ack */
2295                                debug(BMX_DB_WARN, "version mismatch with peer "
2296                                                "%s (our version 0x%x, peer's version "
2297                                                "0x%x)", (char *) rx->mxc_buffer,
2298                                                BMX_VERSION, version);
2299                                bmx_peer_decref(rx->mxc_peer);
2300                                bmx_put_idle_ctx(rx);
2301                                continue;
2302                        }
2303                        if (bmi_mx->bmx_is_server == 0) {
2304                                debug(BMX_DB_WARN, "received CONN_REQ on a client.");
2305                                bmx_peer_decref(rx->mxc_peer);
2306                                bmx_put_idle_ctx(rx);
2307                                continue;
2308                        }
2309                        mx_decompose_endpoint_addr2(status.source, &nic_id,
2310                                                    &ep_id, &sid);
2311                        {
2312                                void *peerp = &peer;
2313                                mx_get_endpoint_addr_context(status.source, &peerp);
2314                                peer = (struct bmx_peer *) peerp;
2315                        }
2316                        if (peer == NULL) { /* new peer */
2317                                int             ret             = 0;
2318                                char           *host            = NULL;
2319                                uint32_t        board           = 0;
2320                                uint32_t        ep_id           = 0;
2321                                const char     *peername        = rx->mxc_buffer;
2322                                struct bmi_method_addr *map         = NULL;
2323
2324                                debug((BMX_DB_CONN|BMX_DB_PEER), "%s peer %s connecting",
2325                                                __func__, peername);
2326
2327                                ret = bmx_parse_peername(peername, &host,
2328                                                         &board, &ep_id);
2329                                if (ret != 0) {
2330                                        debug(BMX_DB_CONN, "parse_peername() "
2331                                                        "failed on %s",
2332                                                        (char *) rx->mxc_buffer);
2333                                        bmx_peer_decref(rx->mxc_peer);
2334                                        bmx_put_idle_ctx(rx);
2335                                        continue;
2336                                }
2337                                map = bmx_alloc_method_addr(peername, host,
2338                                                            board, ep_id);
2339                                if (map == NULL) {
2340                                        debug((BMX_DB_CONN|BMX_DB_MEM), "unable to alloc a "
2341                                                        "method addr for %s", peername);
2342                                        bmx_peer_decref(rx->mxc_peer);
2343                                        bmx_put_idle_ctx(rx);
2344                                        continue;
2345                                }
2346                                free(host);
2347                                mxmap = map->method_data;
2348                                ret = bmx_peer_alloc(&peer, mxmap);
2349                                if (ret != 0) {
2350                                        debug((BMX_DB_CONN|BMX_DB_MEM), "unable to alloc a "
2351                                                        "peer for %s", peername);
2352                                        bmx_peer_decref(rx->mxc_peer);
2353                                        bmx_put_idle_ctx(rx);
2354                                        continue;
2355                                }
2356                        } else if (sid != peer->mxp_sid) { /* reconnecting peer */
2357                                /* cancel queued txs and rxs, pending rxs */
2358                                debug((BMX_DB_CONN|BMX_DB_PEER), "%s peer "
2359                                      "%s reconnecting", __func__,
2360                                      peer->mxp_mxmap->mxm_peername);
2361                                if (peer->mxp_state == BMX_PEER_READY)
2362                                        bmx_peer_disconnect(peer, 0, BMI_ENETRESET);
2363                                mxmap = peer->mxp_mxmap;
2364                        } else {
2365                                debug((BMX_DB_CONN|BMX_DB_PEER), "%s peer "
2366                                      "%s reconnecting with same sid", __func__,
2367                                      peer->mxp_mxmap->mxm_peername);
2368                                mxmap = peer->mxp_mxmap;
2369                        }
2370                        gen_mutex_lock(&peer->mxp_lock);
2371                        debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_WAIT",
2372                                           peer->mxp_mxmap->mxm_peername);
2373                        peer->mxp_state = BMX_PEER_WAIT;
2374                        peer->mxp_tx_id = id;
2375                        peer->mxp_sid = sid;
2376                        gen_mutex_unlock(&peer->mxp_lock);
2377                        bmx_peer_addref(peer); /* add ref until completion of CONN_ACK */
2378                        mx_iconnect(bmi_mx->bmx_ep, peer->mxp_nic_id, mxmap->mxm_ep_id,
2379                                    BMX_MAGIC, ack, peer, &request);
2380                        bmx_put_idle_ctx(rx);
2381                }
2382        } while (result);
2383
2384        return;
2385}
2386
2387/* test for ICON_ACK messages (on the server)
2388 * if found
2389 *    register mxmap
2390 *    get idle tx
2391 *    marshall CONN_ACK
2392 *    set peer state to READY
2393 *    send CONN_ACK
2394 */
2395static void
2396bmx_handle_icon_ack(void)
2397{
2398        uint32_t        result  = 0;
2399        struct bmx_ctx  *tx     = NULL;
2400        struct bmx_peer *peer   = NULL;
2401
2402        if (!bmi_mx->bmx_is_server) return;
2403        do {
2404                uint64_t        match   = (uint64_t) BMX_MSG_ICON_ACK << BMX_MSG_SHIFT;
2405                uint64_t        mask    = BMX_MASK_MSG;
2406                mx_status_t     status;
2407
2408                mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
2409                if (result) {
2410
2411                        peer = (struct bmx_peer *) status.context;
2412                        debug(BMX_DB_CONN, "%s returned for %s with %s", __func__,
2413                                        peer->mxp_mxmap->mxm_peername,
2414                                        mx_strstatus(status.code));
2415                        if (status.code != MX_STATUS_SUCCESS) {
2416                                debug((BMX_DB_CONN|BMX_DB_PEER|BMX_DB_WARN),
2417                                      "%s: connect to %s failed with %s", __func__,
2418                                      peer->mxp_mxmap->mxm_peername,
2419                                      mx_strstatus(status.code));
2420                                bmx_peer_disconnect(peer, 1, bmx_mx_to_bmi_errno(status.code));
2421                                /* drop ref taken before calling mx_iconnect */
2422                                bmx_peer_decref(peer);
2423                                continue;
2424                        }
2425                        gen_mutex_lock(&peer->mxp_lock);
2426                        peer->mxp_epa = status.source;
2427                        debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_READY",
2428                                           peer->mxp_mxmap->mxm_peername);
2429                        peer->mxp_state = BMX_PEER_READY;
2430                        /* NOTE no need to call bmx_peer_post_queued_[rxs|txs]()
2431                         * since the server should not have any queued msgs */
2432                        gen_mutex_unlock(&peer->mxp_lock);
2433                        mx_set_endpoint_addr_context(peer->mxp_epa, (void *) peer);
2434
2435                        tx = bmx_get_idle_tx();
2436                        if (tx == NULL) {
2437                                int     ret     = 0;
2438                                ret = bmx_ctx_alloc(&tx, BMX_REQ_TX);
2439                                if (ret != 0) {
2440                                        debug((BMX_DB_CONN|BMX_DB_MEM), "unable to alloc a "
2441                                                        "ctx to send a CONN_ACK to %s",
2442                                                        peer->mxp_mxmap->mxm_peername);
2443                                        bmx_peer_disconnect(peer, 1, BMI_ENOMEM);
2444                                        /* drop ref taken before calling mx_iconnect */
2445                                        bmx_peer_decref(peer);
2446                                        continue;
2447                                }
2448                                tx->mxc_state = BMX_CTX_PREP;
2449                        }
2450                        tx->mxc_msg_type = BMX_MSG_CONN_ACK;
2451                        /* tx->mxc_mop unused */
2452                        tx->mxc_peer = peer;
2453                        tx->mxc_tag = BMX_VERSION;
2454                        bmx_create_match(tx);
2455                        tx->mxc_seg.segment_length = 0;
2456                        tx->mxc_nseg = 1;
2457                        debug(BMX_DB_CONN, "%s tx match= 0x%llx length= %lld", __func__,
2458                                        llu(tx->mxc_match), lld(tx->mxc_nob));
2459                        mx_isend(bmi_mx->bmx_ep, &tx->mxc_seg, tx->mxc_nseg, peer->mxp_epa,
2460                                 tx->mxc_match, (void *) tx, &tx->mxc_mxreq);
2461                        if (!peer->mxp_exist) {
2462                                debug(BMX_DB_PEER, "calling bmi_method_addr_reg_callback"
2463                                      "on %s", peer->mxp_mxmap->mxm_peername);
2464                                bmi_method_addr_reg_callback(peer->mxp_map);
2465                                peer->mxp_exist = 1;
2466                        }
2467                }
2468        } while (result);
2469
2470        return;
2471}
2472
2473/* test for CONN_ACK messages (on the server)
2474 * Since the unexpected_recv() function drops the CONN_ACK on the client
2475 * side, we only need this on the server to get the completion and
2476 * put the tx on the idle list. */
2477static void
2478bmx_handle_conn_ack(void)
2479{
2480        uint32_t        result  = 0;
2481        struct bmx_ctx  *tx     = NULL;
2482
2483        if (!bmi_mx->bmx_is_server) goto out;
2484        do {
2485                uint64_t        match   = (uint64_t) BMX_MSG_CONN_ACK << BMX_MSG_SHIFT;
2486                uint64_t        mask    = BMX_MASK_MSG;
2487                mx_status_t     status;
2488
2489                mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
2490                if (result) {
2491                        tx = (struct bmx_ctx *) status.context;
2492                        debug(BMX_DB_CONN, "%s returned tx match 0x%llx with %s", __func__,
2493                                        llu(tx->mxc_match), mx_strstatus(status.code));
2494                        bmx_peer_decref(tx->mxc_peer);
2495                        bmx_put_idle_ctx(tx);
2496                }
2497        } while (result);
2498
2499out:
2500        return;
2501}
2502
2503static void
2504bmx_connection_handlers(void)
2505{
2506        static int      count   = 0;
2507        int             print   = (count++ % 1000 == 0);
2508
2509        if (print)
2510                BMX_ENTER;
2511
2512        /* push connection messages along */
2513        bmx_handle_icon_req();
2514        bmx_handle_conn_req();
2515        bmx_handle_icon_ack();
2516        bmx_handle_conn_ack();
2517        if (print)
2518                BMX_EXIT;
2519        return;
2520}
2521
2522static void
2523bmx_complete_ctx(struct bmx_ctx *ctx, bmi_op_id_t *outid, bmi_error_code_t *err,
2524                 bmi_size_t *size, void **user_ptr)
2525{
2526        struct bmx_peer *peer   = ctx->mxc_peer;
2527
2528        *outid = ctx->mxc_mop->op_id;
2529        *err = ctx->mxc_error;
2530        *size = ctx->mxc_mxstat.xfer_length;
2531        if (user_ptr)
2532                *user_ptr = ctx->mxc_mop->user_ptr;
2533        PINT_EVENT_END(
2534            (ctx->mxc_type == BMX_REQ_TX ?
2535             bmi_mx_send_event_id : bmi_mx_recv_event_id),
2536            bmi_mx_pid, NULL, ctx->mxc_mop->event_id,
2537            *outid, *size);
2538
2539        id_gen_fast_unregister(ctx->mxc_mop->op_id);
2540        BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop));
2541        bmx_put_idle_ctx(ctx);
2542        bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */
2543
2544        return;
2545}
2546
2547static int
2548BMI_mx_test(bmi_op_id_t id, int *outcount, bmi_error_code_t *err,
2549            bmi_size_t *size, void **user_ptr, int max_idle_time __unused,
2550            bmi_context_id context_id)
2551{
2552        uint32_t         result = 0;
2553        struct method_op *mop   = NULL;
2554        struct bmx_ctx   *ctx   = NULL;
2555        struct bmx_peer  *peer  = NULL;
2556
2557        BMX_ENTER;
2558
2559        bmx_connection_handlers();
2560
2561        bmx_get_completion_token();
2562
2563        mop = id_gen_fast_lookup(id);
2564        ctx = mop->method_data;
2565        peer = ctx->mxc_peer;
2566
2567        assert(context_id == mop->context_id);
2568        if (ctx->mxc_type == BMX_REQ_RX)
2569                assert(ctx->mxc_msg_type != BMX_MSG_UNEXPECTED);
2570        assert(context_id == ctx->mxc_mop->context_id);
2571
2572        switch (ctx->mxc_state) {
2573        case BMX_CTX_COMPLETED:
2574        case BMX_CTX_CANCELED:
2575                gen_mutex_lock(&bmi_mx->bmx_done_q_lock[(int) context_id]);
2576                qlist_del_init(&ctx->mxc_list);
2577                gen_mutex_unlock(&bmi_mx->bmx_done_q_lock[(int) context_id]);
2578                bmx_complete_ctx(ctx, &id, err, size, user_ptr);
2579                *outcount = 1;
2580                break;
2581        case BMX_CTX_PENDING:
2582                mx_test(bmi_mx->bmx_ep, &ctx->mxc_mxreq, &ctx->mxc_mxstat, &result);
2583                if (result) {
2584                        bmx_deq_pending_ctx(ctx);
2585                        bmx_complete_ctx(ctx, &id, err, size, user_ptr);
2586                        *outcount = 1;
2587                }
2588                break;
2589        default:
2590                debug(BMX_DB_CTX, "%s called on %s with state %d", __func__,
2591                        ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", ctx->mxc_state);
2592        }
2593        bmx_release_completion_token();
2594        BMX_EXIT;
2595
2596        return 0;
2597}
2598
2599
2600static int
2601BMI_mx_testcontext(int incount, bmi_op_id_t *outids, int *outcount,
2602            bmi_error_code_t *errs, bmi_size_t *sizes, void **user_ptrs,
2603            int max_idle_time, bmi_context_id context_id)
2604{
2605        int             i               = 0;
2606        int             completed       = 0;
2607        int             old             = 0;
2608        uint64_t        match           = 0ULL;
2609        uint64_t        mask            = BMX_MASK_MSG;
2610        struct bmx_ctx  *ctx            = NULL;
2611        struct bmx_peer *peer           = NULL;
2612        int             wait            = 0;
2613        static int      count           = 0;
2614        int             print           = 0;
2615
2616        if (count++ % 1000 == 0) {
2617                BMX_ENTER;
2618                print = 1;
2619        }
2620
2621        bmx_connection_handlers();
2622
2623        bmx_get_completion_token();
2624
2625        /* always return queued, completed messages first */
2626        do {
2627                bmx_deq_completed(&ctx, context_id);
2628                if (ctx) {
2629                        bmx_complete_ctx(ctx, &outids[completed], &errs[completed],
2630                                         &sizes[completed], &user_ptrs[completed]);
2631                        completed++;
2632                }
2633        } while (completed < incount && ctx != NULL);
2634
2635        if (completed > 0)
2636                debug(BMX_DB_CTX, "%s found %d completed messages", __func__, completed);
2637
2638        /* try to complete expected messages
2639         * we will always try (incount - completed) times even
2640         *     if some iterations have no result */
2641
2642        match = (uint64_t) BMX_MSG_EXPECTED << BMX_MSG_SHIFT;
2643        for (i = completed; i < incount; i++) {
2644                uint32_t        result  = 0;
2645                mx_status_t     status;
2646
2647                old = completed;
2648
2649                if (wait == 0 || wait == 2) {
2650                        mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
2651                        if (!result && wait == 0 && max_idle_time > 0) wait = 1;
2652                } else { /* wait == 1 */
2653                        mx_wait_any(bmi_mx->bmx_ep, max_idle_time, match, mask,
2654                                    &status, &result);
2655                        wait = 2;
2656                }
2657
2658                if (result) {
2659                        ctx = (struct bmx_ctx *) status.context;
2660                        bmx_deq_pending_ctx(ctx);
2661                        if (ctx->mxc_mop->context_id != context_id) {
2662                                bmx_q_completed(ctx, BMX_CTX_COMPLETED, status,
2663                                                bmx_mx_to_bmi_errno(status.code));
2664                                continue;
2665                        }
2666                        ctx->mxc_mxstat = status;
2667                        peer = ctx->mxc_peer;
2668                        debug(BMX_DB_CTX, "%s completing expected %s with match 0x%llx "
2669                                        "for %s with op_id %llu length %d %s "
2670                                        "context_id= %d mop->context_id= %d",
2671                                        __func__, ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX",
2672                                        llu(ctx->mxc_match), peer->mxp_mxmap->mxm_peername,
2673                                        llu(ctx->mxc_mop->op_id), status.xfer_length,
2674                                        mx_strstatus(status.code), (int) context_id,
2675                                        (int) ctx->mxc_mop->context_id);
2676
2677                        bmx_complete_ctx(ctx, &outids[completed], &errs[completed],
2678                                         &sizes[completed], &user_ptrs[completed]);
2679                        completed++;
2680                }
2681        }
2682
2683        if (completed - old > 0)
2684                debug(BMX_DB_CTX, "%s found %d expected messages", __func__, completed - old);
2685
2686        /* try to complete unexpected sends */
2687
2688        match = (uint64_t) BMX_MSG_UNEXPECTED << BMX_MSG_SHIFT;
2689
2690        old = completed;
2691
2692        for (i = completed; i < incount; i++) {
2693                uint32_t        result          = 0;
2694                mx_status_t     status;
2695                int             again           = 1;
2696
2697                ctx = NULL;
2698
2699                while (!ctx && again) {
2700                        again = 0;
2701                        mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
2702                        if (result) {
2703                                ctx = (struct bmx_ctx *) status.context;
2704                                bmx_deq_pending_ctx(ctx);
2705                                peer = ctx->mxc_peer;
2706                                if (ctx->mxc_type == BMX_REQ_RX ||
2707                                    ctx->mxc_mop->context_id != context_id) {
2708                                        /* queue until testunexpected or queue
2709                                         * until testcontext for the correct context */
2710                                        bmx_q_completed(ctx, BMX_CTX_COMPLETED, status,
2711                                                        bmx_mx_to_bmi_errno(status.code));
2712                                        result = 0;
2713                                        again = 1;
2714                                        ctx = NULL;
2715                                }
2716                        }
2717                }
2718                if (result) {
2719                        debug(BMX_DB_CTX, "%s completing unexpected %s with "
2720                                        "match 0x%llx for %s with op_id %llu",
2721                                        __func__,
2722                                        ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX",
2723                                        llu(ctx->mxc_match),
2724                                        peer->mxp_mxmap->mxm_peername,
2725                                        llu(ctx->mxc_mop->op_id));
2726
2727                        ctx->mxc_mxstat = status;
2728                        bmx_complete_ctx(ctx, &outids[completed], &errs[completed],
2729                                         &sizes[completed], &user_ptrs[completed]);
2730
2731                        if (status.code != MX_SUCCESS) {
2732                                debug(BMX_DB_CTX, "%s unexpected send completed with "
2733                                      "error %s", __func__, mx_strstatus(status.code));
2734                                bmx_peer_disconnect(peer, 0, BMI_ENETRESET);
2735                        }
2736                        completed++;
2737                }
2738        }
2739        bmx_release_completion_token();
2740
2741        if (completed - old > 0) {
2742                debug(BMX_DB_CTX, "%s found %d unexpected tx messages",
2743              __func__, completed - old);
2744        }
2745
2746        if (print)
2747                BMX_EXIT;
2748
2749        *outcount = completed;
2750        return completed;
2751}
2752
2753/* test for unexpected receives only, not unex sends */
2754static int
2755BMI_mx_testunexpected(int incount __unused, int *outcount,
2756            struct bmi_method_unexpected_info *ui, uint8_t class, int max_idle_time __unused)
2757{
2758        uint32_t        result          = 0;
2759        uint64_t        match           = ((uint64_t) BMX_MSG_UNEXPECTED << BMX_MSG_SHIFT);
2760        uint64_t        mask            = BMX_MASK_MSG;
2761        mx_status_t     status;
2762        static int      count           = 0;
2763        int             print           = 0;
2764        struct bmx_ctx  *rx             = NULL;
2765        struct bmx_peer *peer           = NULL;
2766        int             again           = 1;
2767        uint64_t        class_match     = 0;
2768
2769        if (count++ % 1000 == 0) {
2770                BMX_ENTER;
2771                print = 1;
2772        }
2773
2774        bmx_connection_handlers();
2775
2776        bmx_get_completion_token();
2777
2778        /* must match the correct class as well */
2779        class_match += class;
2780        match |= (class_match << BMX_CLASS_SHIFT);
2781
2782        /* if the unexpected handler cannot get a rx, it does not post a receive.
2783         * probe for unexpected and post a rx. */
2784        mx_iprobe(bmi_mx->bmx_ep, match, mask, &status, &result);
2785        if (result) {
2786                int     ret     = 0;
2787                ret = bmx_post_unexpected_recv(status.source, 0, 0, 0,
2788                                               status.match_info,
2789                                               status.xfer_length);
2790                if (ret != 0) {
2791                        debug(BMX_DB_CTX, "%s mx_iprobe() found rx with match 0x%llx "
2792                                        "length %d but could not receive it", __func__,
2793                                        llu(status.match_info), status.xfer_length);
2794                }
2795        }
2796
2797        /* check for unexpected receives */
2798        *outcount = 0;
2799
2800        bmx_deq_unex_rx(&rx);
2801        if (rx) {
2802                result = 1;
2803                status = rx->mxc_mxstat;
2804                peer = rx->mxc_peer;
2805        }
2806
2807        while (!rx && again) {
2808                again = 0;
2809                mx_test_any(bmi_mx->bmx_ep, match, mask, &status, &result);
2810                if (result) {
2811                        rx   = (struct bmx_ctx *) status.context;
2812                        bmx_deq_pending_ctx(rx);
2813                        peer = rx->mxc_peer;
2814                        if (rx->mxc_type == BMX_REQ_TX) {
2815                                bmx_q_completed(rx, BMX_CTX_COMPLETED, status,
2816                                                bmx_mx_to_bmi_errno(status.code));
2817                                result = 0;
2818                                again = 1;
2819                                rx = NULL;
2820                        }
2821                }
2822        }
2823
2824        if (result) {
2825                debug(BMX_DB_CTX, "%s completing RX with match 0x%llx for %s",
2826                      __func__, llu(rx->mxc_match), peer->mxp_mxmap->mxm_peername);
2827
2828                ui->error_code = 0;
2829                ui->addr = peer->mxp_map;
2830                ui->size = rx->mxc_nob;
2831                /* avoid a memcpy by giving the rx buffer to BMI
2832                 * and malloc a new one for the rx */
2833                ui->buffer = rx->mxc_buffer;
2834                rx->mxc_buffer = bmx_memalloc(BMX_UNEXPECTED_SIZE, BMX_UNEX_BUF);
2835                rx->mxc_seg.segment_ptr = rx->mxc_buffer;
2836                ui->tag = rx->mxc_tag;
2837
2838                bmx_put_idle_ctx(rx);
2839                bmx_peer_decref(peer); /* drop the ref taken in unexpected_recv() */
2840                *outcount = 1;
2841        }
2842        bmx_release_completion_token();
2843
2844        if (print)
2845                BMX_EXIT;
2846
2847        return 0;
2848}
2849
2850static void
2851bmx_create_peername(void)
2852{
2853        char    peername[MX_MAX_HOSTNAME_LEN + 28]; /* mx://host:board:ep_id\0 */
2854
2855        if (bmi_mx->bmx_board != -1) {
2856                /* mx://host:board:ep_id\0 */
2857                sprintf(peername, "mx://%s:%u:%u", bmi_mx->bmx_hostname,
2858                        bmi_mx->bmx_board, bmi_mx->bmx_ep_id);
2859        } else {
2860                /* mx://host:ep_id\0 */
2861                sprintf(peername, "mx://%s:%u", bmi_mx->bmx_hostname,
2862                        bmi_mx->bmx_ep_id);
2863        }
2864        bmi_mx->bmx_peername = strdup(peername);
2865        return;
2866}
2867
2868static int
2869bmx_peer_connect(struct bmx_peer *peer)
2870{
2871        int                     ret    = 0;
2872        uint64_t                nic_id = 0ULL;
2873        mx_request_t            request;
2874        uint64_t                match  = (uint64_t) BMX_MSG_ICON_REQ << BMX_MSG_SHIFT;
2875        struct bmx_method_addr *mxmap  = peer->mxp_mxmap;
2876
2877        BMX_ENTER;
2878
2879        gen_mutex_lock(&peer->mxp_lock);
2880        if (peer->mxp_state == BMX_PEER_INIT) {
2881                debug(BMX_DB_PEER, "Setting peer %s to BMX_PEER_WAIT",
2882                                   peer->mxp_mxmap->mxm_peername);
2883                peer->mxp_state = BMX_PEER_WAIT;
2884        } else {
2885                gen_mutex_unlock(&peer->mxp_lock);
2886                return 0;
2887        }
2888        gen_mutex_unlock(&peer->mxp_lock);
2889        bmx_peer_addref(peer); /* add ref until completion of CONN_REQ */
2890
2891        /* if we have not opened our endpoint, do so now */
2892        if (bmi_mx->bmx_ep == NULL) {
2893                char                    host[MX_MAX_HOSTNAME_LEN + 1];
2894                char                    *colon  = NULL;
2895                mx_endpoint_addr_t      epa;
2896
2897                ret = bmx_open_endpoint(&bmi_mx->bmx_ep,
2898                                        MX_ANY_NIC,
2899                                        MX_ANY_ENDPOINT);
2900                if (ret != 0) {
2901                        debug((BMX_DB_MX|BMX_DB_CONN), "failed to open endpoint when "
2902                                        "trying to conenct to %s", mxmap->mxm_peername);
2903                        bmx_peer_decref(peer);
2904                        return ret;
2905                }
2906                mx_get_endpoint_addr(bmi_mx->bmx_ep, &epa);
2907                /* get our nic_id and ep_id */
2908                mx_decompose_endpoint_addr2(epa, &nic_id, &bmi_mx->bmx_ep_id,
2909                                            &bmi_mx->bmx_sid);
2910                /* get our board number */
2911                mx_nic_id_to_board_number(nic_id, &bmi_mx->bmx_board);
2912                /* get our hostname */
2913                mx_nic_id_to_hostname(nic_id, host);
2914                bmi_mx->bmx_hostname = strdup(host);
2915                if (bmi_mx->bmx_hostname == NULL) {
2916                        debug((BMX_DB_MX|BMX_DB_CONN), "%s mx_nic_id_to_hostname() did "
2917                                        "not find hostname %s", __func__, host);
2918                        return -1;
2919                }
2920                /* if hostname has :board, remove it */
2921                colon = strchr(bmi_mx->bmx_hostname, ':');
2922                if (colon != NULL) {
2923                        *colon = '\0';
2924                } else {
2925                        /* no board number in our name */
2926                        bmi_mx->bmx_board = -1;
2927                }
2928                /* create our peername */
2929                bmx_create_peername();
2930        }
2931        /* this is a new peer, start the connect handshake
2932         * by calling mx_iconnect() w/BMX_MSG_ICON_REQ */
2933        mx_iconnect(bmi_mx->bmx_ep, peer->mxp_nic_id, mxmap->mxm_ep_id,
2934                    BMX_MAGIC, match, (void *) peer, &request);
2935
2936        BMX_EXIT;
2937
2938        return ret;
2939}
2940
2941/* if it is a new peer:
2942 *    alloc method_addr
2943 *    alloc peer
2944 *    if client
2945 *       open endpoint
2946 *       mx_iconnect()
2947 */
2948/* This is called on the server before BMI_mx_initialize(). */
2949static struct bmi_method_addr *
2950BMI_mx_method_addr_lookup(const char *id)
2951{
2952        int                     ret     = 0;
2953        int                     len     = 0;
2954        char                   *host    = NULL;
2955        uint32_t                board   = 0;
2956        uint32_t                ep_id   = 0;
2957        struct bmi_method_addr *map     = NULL;
2958        struct bmx_method_addr *mxmap   = NULL;
2959
2960        BMX_ENTER;
2961
2962        debug(BMX_DB_INFO, "%s with id %s", __func__, id);
2963        ret = bmx_parse_peername(id, &host, &board, &ep_id);
2964        if (ret != 0) {
2965                debug(BMX_DB_PEER, "%s method_parse_peername() failed on %s", __func__, id);
2966                return NULL;
2967        }
2968
2969        if (bmi_mx != NULL) {
2970                struct bmx_peer         *peer   = NULL;
2971
2972                gen_mutex_lock(&bmi_mx->bmx_peers_lock);
2973                qlist_for_each_entry(peer, &bmi_mx->bmx_peers, mxp_list) {
2974                        mxmap = peer->mxp_mxmap;
2975                        if (!strcmp(mxmap->mxm_hostname, host) &&
2976                            mxmap->mxm_board == board &&
2977                            mxmap->mxm_ep_id == ep_id) {
2978                                map = peer->mxp_map;
2979                                len = strlen(host);
2980                                BMX_FREE(host, len);
2981                                break;
2982                        }
2983                }
2984                gen_mutex_unlock(&bmi_mx->bmx_peers_lock);
2985        }
2986       
2987        if (map == NULL) {
2988                map = bmx_alloc_method_addr(id, host, board, ep_id);
2989                if (bmi_mx != NULL) {
2990                        struct bmx_peer *peer   = NULL;
2991
2992                        mxmap = map->method_data;
2993                        ret = bmx_peer_alloc(&peer, mxmap);
2994                        if (ret != 0) {
2995                                debug((BMX_DB_MEM|BMX_DB_PEER), "%s unable to alloc peer "
2996                                                "for %s", __func__, mxmap->mxm_peername);
2997                                goto out;
2998                        }
2999                        ret = bmx_peer_connect(peer);
3000                        if (ret != 0) {
3001                                debug((BMX_DB_CONN|BMX_DB_MX|BMX_DB_PEER), "%s peer_connect()"
3002                                       " failed with %d", __func__, ret);
3003                        }
3004                }
3005                if (map != NULL) free(host);
3006        }
3007out:
3008        BMX_EXIT;
3009
3010        return map;
3011}
3012
3013static int
3014BMI_mx_open_context(bmi_context_id context_id __unused)
3015{
3016        return 0;
3017}
3018
3019static void
3020BMI_mx_close_context(bmi_context_id context_id __unused)
3021{
3022        return;
3023}
3024
3025/* NOTE There may be a race between this and BMI_mx_testcontext(). */
3026static int
3027BMI_mx_cancel(bmi_op_id_t id, bmi_context_id context_id)
3028{
3029        struct method_op        *mop;
3030        struct bmx_ctx          *ctx     = NULL;
3031        struct bmx_peer         *peer   = NULL;
3032        uint32_t                result  = 0;
3033
3034        BMX_ENTER;
3035
3036        bmx_get_completion_token();
3037
3038        mop = id_gen_fast_lookup(id);
3039        ctx = mop->method_data;
3040        peer = ctx->mxc_peer;
3041
3042        assert(context_id == ctx->mxc_mop->context_id);
3043
3044        debug(BMX_DB_CTX, "%s %s op_id %llu mxc_state %d peer state %d", __func__,
3045                        ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX",
3046                        llu(ctx->mxc_mop->op_id), ctx->mxc_state, peer->mxp_state);
3047
3048        /* avoid race with connection setup */
3049        gen_mutex_lock(&peer->mxp_lock);
3050
3051        switch (ctx->mxc_state) {
3052        case BMX_CTX_QUEUED:
3053                qlist_del_init(&ctx->mxc_list);
3054                gen_mutex_unlock(&peer->mxp_lock);
3055                bmx_q_completed(ctx, BMX_CTX_CANCELED, BMX_NO_STATUS, BMI_ECANCEL);
3056                break;
3057        case BMX_CTX_PENDING:
3058                gen_mutex_unlock(&peer->mxp_lock);
3059                if (ctx->mxc_type == BMX_REQ_TX) {
3060                        /* see if it completed first */
3061                        mx_test(bmi_mx->bmx_ep, &ctx->mxc_mxreq, &ctx->mxc_mxstat, &result);
3062                        if (result == 1) {
3063                                debug(BMX_DB_CTX, "%s completed TX op_id %llu "
3064                                      "mxc_state %d peer state %d status.code %s",
3065                                      __func__, llu(ctx->mxc_mop->op_id), ctx->mxc_state,
3066                                      peer->mxp_state, mx_strstatus(ctx->mxc_mxstat.code));
3067                                bmx_deq_pending_ctx(ctx);
3068                                bmx_q_completed(ctx, BMX_CTX_CANCELED,
3069                                                ctx->mxc_mxstat, BMI_ECANCEL);
3070                        } else {
3071                                /* and if not, then disconnect() */
3072                                bmx_peer_disconnect(peer, 1, BMI_ENETRESET);
3073                        }
3074                } else { /* BMX_REQ_RX */
3075                        mx_cancel(bmi_mx->bmx_ep, &ctx->mxc_mxreq, &result);
3076                        if (result == 1) {
3077                                bmx_deq_pending_ctx(ctx);
3078                                bmx_q_completed(ctx, BMX_CTX_CANCELED,
3079                                                BMX_NO_STATUS, BMI_ECANCEL);
3080                        }
3081                }
3082                break;
3083        default:
3084                debug(BMX_DB_CTX, "%s called on %s with state %d", __func__,
3085                        ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", ctx->mxc_state);
3086        }
3087        bmx_release_completion_token();
3088
3089        BMX_EXIT;
3090
3091        return 0;
3092}
3093
3094/* Unlike bmi_ib, we always know our peername, check if the peer exists */
3095static const char *
3096BMI_mx_rev_lookup(struct bmi_method_addr *meth)
3097{
3098        struct bmx_method_addr  *mxmap = meth->method_data;
3099
3100        BMX_ENTER;
3101
3102        if (mxmap->mxm_peer && mxmap->mxm_peer->mxp_state != BMX_PEER_DISCONNECT)
3103                return mxmap->mxm_peername;
3104        else
3105                return "(unconnected)";
3106}
3107
3108
3109const struct bmi_method_ops bmi_mx_ops =
3110{
3111    .method_name               = "bmi_mx",
3112    .flags = 0,
3113    .initialize                = BMI_mx_initialize,
3114    .finalize                  = BMI_mx_finalize,
3115    .set_info                  = BMI_mx_set_info,
3116    .get_info                  = BMI_mx_get_info,
3117    .memalloc                  = BMI_mx_memalloc,
3118    .memfree                   = BMI_mx_memfree,
3119    .unexpected_free           = BMI_mx_unexpected_free,
3120    .test                      = BMI_mx_test,
3121    .testsome                  = 0,
3122    .testcontext               = BMI_mx_testcontext,
3123    .testunexpected            = BMI_mx_testunexpected,
3124    .method_addr_lookup        = BMI_mx_method_addr_lookup,
3125    .post_send_list            = BMI_mx_post_send_list,
3126    .post_recv_list            = BMI_mx_post_recv_list,
3127    .post_sendunexpected_list  = BMI_mx_post_sendunexpected_list,
3128    .open_context              = BMI_mx_open_context,
3129    .close_context             = BMI_mx_close_context,
3130    .cancel                    = BMI_mx_cancel,
3131    .rev_lookup_unexpected     = BMI_mx_rev_lookup,
3132};
Note: See TracBrowser for help on using the browser.