root/branches/cu-security-branch/src/io/bmi/bmi_ib/ib.c @ 7941

Revision 7941, 61.4 KB (checked in by nlmills, 4 years ago)

merged in changes from summer at LANL

Line 
1/*
2 * InfiniBand BMI method.
3 *
4 * Copyright (C) 2003-6 Pete Wyckoff <pw@osc.edu>
5 * Copyright (C) 2006 Kyle Schochenmaier <kschoche@scl.ameslab.gov>
6 *
7 * See COPYING in top-level directory.
8 */
9#include <stdio.h>
10#include <stdlib.h>
11#include <string.h>
12#include <unistd.h>
13#include <errno.h>
14#include <fcntl.h>
15#include <sys/types.h>
16#include <arpa/inet.h>   /* inet_ntoa */
17#include <netdb.h>       /* gethostbyname */
18#include <sys/poll.h>
19#define __PINT_REQPROTO_ENCODE_FUNCS_C  /* include definitions */
20#include <src/common/id-generator/id-generator.h>
21#include <src/io/bmi/bmi-method-support.h>   /* bmi_method_ops ... */
22#include <src/io/bmi/bmi-method-callback.h>  /* bmi_method_addr_reg_callback */
23#include <src/common/gen-locks/gen-locks.h>  /* gen_mutex_t ... */
24#include <src/common/misc/pvfs2-internal.h>
25
26#ifdef HAVE_VALGRIND_H
27#include <memcheck.h>
28#else
29#define VALGRIND_MAKE_MEM_DEFINED(addr,len)
30#endif
31
32#include "ib.h"
33
34static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
35
36/*
37 * Handle given by upper layer, which must be handed back to create
38 * method_addrs.
39 */
40static int bmi_ib_method_id;
41
42/* alloc space for the single device structure pointer; one for
43 * vapi and one for openib */
44ib_device_t *ib_device __hidden = NULL;
45
46/* these all vector through the ib_device */
47#define new_connection ib_device->func.new_connection
48#define close_connection ib_device->func.close_connection
49#define drain_qp ib_device->func.drain_qp
50#define ib_initialize ib_device->func.ib_initialize
51#define ib_finalize ib_device->func.ib_finalize
52#define post_sr ib_device->func.post_sr
53#define post_sr_rdmaw ib_device->func.post_sr_rdmaw
54#define check_cq ib_device->func.check_cq
55#define prepare_cq_block ib_device->func.prepare_cq_block
56#define ack_cq_completion_event ib_device->func.ack_cq_completion_event
57#define wc_status_string ib_device->func.wc_status_string
58#define mem_register ib_device->func.mem_register
59#define mem_deregister ib_device->func.mem_deregister
60#define check_async_events ib_device->func.check_async_events
61
62#if MEMCACHE_EARLY_REG
63#if MEMCACHE_BOUNCEBUF
64#error Not sensible to use bouncebuf with early reg.  First use of bouncebuf \
65  will register it, thus no effect whether early reg or not.
66#endif
67#endif
68
69#if MEMCACHE_BOUNCEBUF
70static ib_buflist_t reg_send_buflist = { .num = 0 };
71static ib_buflist_t reg_recv_buflist = { .num = 0 };
72static void *reg_send_buflist_buf;
73static void *reg_recv_buflist_buf;
74static const bmi_size_t reg_send_buflist_len = 256 * 1024;
75static const bmi_size_t reg_recv_buflist_len = 256 * 1024;
76#endif
77
78static void encourage_send_incoming_cts(struct buf_head *bh, u_int32_t byte_len);
79static void encourage_recv_incoming(struct buf_head *bh, msg_type_t type,
80                                    u_int32_t byte_len);
81static void encourage_rts_done_waiting_buffer(struct ib_work *sq);
82static int send_cts(struct ib_work *rq);
83static void ib_close_connection(ib_connection_t *c);
84static int ib_tcp_client_connect(ib_method_addr_t *ibmap,
85                                 struct bmi_method_addr *remote_map);
86static int ib_tcp_server_check_new_connections(void);
87static int ib_block_for_activity(int timeout_ms);
88
89/*
90 * Return string form of work completion opcode field.
91 */
92static const char *wc_opcode_string(int opcode)
93{
94    if (opcode == BMI_IB_OP_SEND)
95        return "SEND";
96    else if (opcode == BMI_IB_OP_RECV)
97        return "RECV";
98    else if (opcode == BMI_IB_OP_RDMA_WRITE)
99        return "RDMA WRITE";
100    else
101        return "(UNKNOWN)";
102}
103
104/*
105 * Wander through single completion queue, pulling off messages and
106 * sticking them on the proper connection queues.  Later you can
107 * walk the incomingq looking for things to do to them.  Returns
108 * number of new things that arrived.
109 */
110static int ib_check_cq(void)
111{
112    int ret = 0;
113
114    for (;;) {
115        struct bmi_ib_wc wc;
116        int vret;
117
118        vret = check_cq(&wc);
119        if (vret == 0)
120            break;  /* empty */
121
122        debug(4, "%s: found something", __func__);
123        ++ret;
124        if (wc.status != 0) {
125            /* opcode is not necessarily valid; only wr_id, status, qp_num,
126             * and vendor_err can be relied upon */
127            if (wc.opcode == BMI_IB_OP_SEND) {
128                debug(0, "%s: entry id 0x%llx SEND error %s", __func__,
129                  llu(wc.id), wc_status_string(wc.status));
130                if (wc.id) {
131                    ib_connection_t *c = ptr_from_int64(wc.id);
132                    if (c->cancelled) {
133                        debug(0,
134                          "%s: ignoring send error on cancelled conn to %s",
135                          __func__, c->peername);
136                    }
137                }
138            } else {
139                error("%s: entry id 0x%llx opcode %s error %s", __func__,
140                  llu(wc.id), wc_opcode_string(wc.opcode),
141                  wc_status_string(wc.status));
142            }
143        }
144
145        if (wc.opcode == BMI_IB_OP_RECV) {
146            /*
147             * Remote side did a send to us.
148             */
149            msg_header_common_t mh_common;
150            struct buf_head *bh = ptr_from_int64(wc.id);
151            char *ptr = bh->buf;
152            u_int32_t byte_len = wc.byte_len;
153
154            VALGRIND_MAKE_MEM_DEFINED(ptr, byte_len);
155            decode_msg_header_common_t(&ptr, &mh_common);
156            bh->c->send_credit += mh_common.credit;
157
158            debug(2, "%s: recv from %s len %d type %s credit %d",
159                  __func__, bh->c->peername, byte_len,
160                  msg_type_name(mh_common.type), mh_common.credit);
161            if (mh_common.type == MSG_CTS) {
162                /* incoming CTS messages go to the send engine */
163                encourage_send_incoming_cts(bh, byte_len);
164            } else {
165                /* something for the recv side, no known rq yet */
166                encourage_recv_incoming(bh, mh_common.type, byte_len);
167            }
168
169        } else if (wc.opcode == BMI_IB_OP_RDMA_WRITE) {
170
171            /* completion event for the rdma write we initiated, used
172             * to signal memory unpin etc. */
173            struct ib_work *sq = ptr_from_int64(wc.id);
174
175            debug(3, "%s: sq %p %s", __func__, sq,
176                  sq_state_name(sq->state.send));
177
178            bmi_ib_assert(sq->state.send == SQ_WAITING_DATA_SEND_COMPLETION,
179                          "%s: wrong send state %s", __func__,
180                          sq_state_name(sq->state.send));
181
182            sq->state.send = SQ_WAITING_RTS_DONE_BUFFER;
183
184#if !MEMCACHE_BOUNCEBUF
185            memcache_deregister(ib_device->memcache, &sq->buflist);
186#endif
187            debug(2, "%s: sq %p RDMA write done, now %s", __func__, sq,
188                  sq_state_name(sq->state.send));
189
190            encourage_rts_done_waiting_buffer(sq);
191
192        } else if (wc.opcode == BMI_IB_OP_SEND) {
193
194            struct buf_head *bh = ptr_from_int64(wc.id);
195            struct ib_work *sq = bh->sq;
196
197            if (sq == NULL) {
198                /* MSG_BYE or MSG_CREDIT */
199                debug(2, "%s: MSG_BYE or MSG_CREDIT completed locally",
200                      __func__);
201            } else if (sq->type == BMI_SEND) {
202                sq_state_t state = sq->state.send;
203
204                if (state == SQ_WAITING_EAGER_SEND_COMPLETION)
205                    sq->state.send = SQ_WAITING_USER_TEST;
206                else if (state == SQ_WAITING_RTS_SEND_COMPLETION)
207                    sq->state.send = SQ_WAITING_CTS;
208                else if (state == SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS)
209                    sq->state.send = SQ_WAITING_DATA_SEND_COMPLETION;
210                else if (state == SQ_WAITING_RTS_DONE_SEND_COMPLETION)
211                    sq->state.send = SQ_WAITING_USER_TEST;
212                else if (state == SQ_CANCELLED)
213                    ;
214                else
215                    bmi_ib_assert(0, "%s: unknown send state %s (%d) of sq %p",
216                                  __func__, sq_state_name(sq->state.send),
217                                  sq->state.send, sq);
218                debug(2, "%s: send to %s completed locally: sq %p -> %s",
219                      __func__, bh->c->peername, sq,
220                      sq_state_name(sq->state.send));
221
222            } else {
223                struct ib_work *rq = sq;  /* rename */
224                rq_state_t state = rq->state.recv;
225               
226                if (state == RQ_RTS_WAITING_CTS_SEND_COMPLETION)
227                    rq->state.recv = RQ_RTS_WAITING_RTS_DONE;
228                else if (state == RQ_CANCELLED)
229                    ;
230                else
231                    bmi_ib_assert(0, "%s: unknown recv state %s of rq %p",
232                                  __func__, rq_state_name(rq->state.recv), rq);
233                debug(2, "%s: send to %s completed locally: rq %p -> %s",
234                      __func__, bh->c->peername, rq,
235                      rq_state_name(rq->state.recv));
236            }
237
238            qlist_add_tail(&bh->list, &bh->c->eager_send_buf_free);
239
240        } else {
241            error("%s: cq entry id 0x%llx opcode %d unexpected", __func__,
242              llu(wc.id), wc.opcode);
243        }
244    }
245    return ret;
246}
247
248/*
249 * Initialize common header of all messages.
250 */
251static void msg_header_init(msg_header_common_t *mh_common,
252                            ib_connection_t *c, msg_type_t type)
253{
254    mh_common->type = type;
255    mh_common->credit = c->return_credit;
256    c->return_credit = 0;
257}
258
259/*
260 * Grab an empty buf head, if available.
261 */
262static struct buf_head *get_eager_buf(ib_connection_t *c)
263{
264    struct buf_head *bh = NULL;
265
266    if (c->send_credit > 0) {
267        --c->send_credit;
268        bh = qlist_try_del_head(&c->eager_send_buf_free);
269        bmi_ib_assert(bh, "%s: empty eager_send_buf_free list, peer %s",
270                      __func__, c->peername);
271    }
272    return bh;
273}
274
275/*
276 * Re-post a receive buffer, possibly returning credit to the peer.
277 */
278static void post_rr(ib_connection_t *c, struct buf_head *bh)
279{
280    ib_device->func.post_rr(c, bh);
281    ++c->return_credit;
282
283    /* if credits are building up, explicitly send them over */
284    if (c->return_credit > ib_device->eager_buf_num - 4) {
285        msg_header_common_t mh_common;
286        char *ptr;
287
288        /* one credit saved back for just this situation, do not check */
289        --c->send_credit;
290        bh = qlist_try_del_head(&c->eager_send_buf_free);
291        bmi_ib_assert(bh, "%s: empty eager_send_buf_free list", __func__);
292        bh->sq = NULL;
293        debug(2, "%s: return %d credits to %s", __func__, c->return_credit,
294              c->peername);
295        msg_header_init(&mh_common, c, MSG_CREDIT);
296        ptr = bh->buf;
297        encode_msg_header_common_t(&ptr, &mh_common);
298        post_sr(bh, sizeof(mh_common));
299    }
300}
301
302/*
303 * Push a send message along its next step.  Called internally only.
304 */
305static void encourage_send_waiting_buffer(struct ib_work *sq)
306{
307    struct buf_head *bh;
308    ib_connection_t *c = sq->c;
309
310    debug(3, "%s: sq %p", __func__, sq);
311    bmi_ib_assert(sq->state.send == SQ_WAITING_BUFFER,
312                  "%s: wrong send state %s", __func__,
313                  sq_state_name(sq->state.send));
314
315    bh = get_eager_buf(c);
316    if (!bh) {
317        debug(2, "%s: sq %p no free send buffers to %s", __func__,
318              sq, c->peername);
319        return;
320    }
321    sq->bh = bh;
322    bh->sq = sq;  /* uplink for completion */
323
324    if (sq->buflist.tot_len <= ib_device->eager_buf_payload) {
325        /*
326         * Eager send.
327         */
328        msg_header_eager_t mh_eager;
329        char *ptr = bh->buf;
330
331        msg_header_init(&mh_eager.c, c, sq->is_unexpected
332                        ? MSG_EAGER_SENDUNEXPECTED : MSG_EAGER_SEND);
333        mh_eager.bmi_tag = sq->bmi_tag;
334        encode_msg_header_eager_t(&ptr, &mh_eager);
335
336        memcpy_from_buflist(&sq->buflist,
337                            (msg_header_eager_t *) bh->buf + 1);
338
339        /* send the message */
340        post_sr(bh, (u_int32_t) (sizeof(mh_eager) + sq->buflist.tot_len));
341
342        /* wait for ack saying remote has received and recycled his buf */
343        sq->state.send = SQ_WAITING_EAGER_SEND_COMPLETION;
344        debug(2, "%s: sq %p sent EAGER len %lld", __func__, sq,
345              lld(sq->buflist.tot_len));
346
347    } else {
348        /*
349         * Request to send, rendez-vous.  Include the mop id in the message
350         * which will be returned to us in the CTS so we can look it up.
351         */
352        msg_header_rts_t mh_rts;
353        char *ptr = bh->buf;
354
355        msg_header_init(&mh_rts.c, c, MSG_RTS);
356        mh_rts.bmi_tag = sq->bmi_tag;
357        mh_rts.mop_id = sq->mop->op_id;
358        mh_rts.tot_len = sq->buflist.tot_len;
359
360        encode_msg_header_rts_t(&ptr, &mh_rts);
361
362        post_sr(bh, sizeof(mh_rts));
363
364#if MEMCACHE_EARLY_REG
365        /* XXX: need to lock against receiver thread?  Could poll return
366         * the CTS and start the data send before this completes? */
367        memcache_register(ib_device->memcache, &sq->buflist);
368#endif
369
370        sq->state.send = SQ_WAITING_RTS_SEND_COMPLETION;
371        debug(2, "%s: sq %p sent RTS mopid %llx len %lld", __func__, sq,
372              llu(sq->mop->op_id), lld(sq->buflist.tot_len));
373    }
374}
375
376/*
377 * Look at the incoming message which is a response to an earlier RTS
378 * from us, and start the real data send.
379 */
380static void
381encourage_send_incoming_cts(struct buf_head *bh, u_int32_t byte_len)
382{
383    msg_header_cts_t mh_cts;
384    struct ib_work *sq, *sqt;
385    u_int32_t want;
386    char *ptr = bh->buf;
387
388    decode_msg_header_cts_t(&ptr, &mh_cts);
389
390    /*
391     * Look through this CTS message to determine the owning sq.  Works
392     * using the mop_id which was sent during the RTS, now returned to us.
393     */
394    sq = 0;
395    qlist_for_each_entry(sqt, &ib_device->sendq, list) {
396        debug(8, "%s: looking for op_id 0x%llx, consider 0x%llx", __func__,
397          llu(mh_cts.rts_mop_id), llu(sqt->mop->op_id));
398        if (sqt->c == bh->c
399            && sqt->mop->op_id == (bmi_op_id_t) mh_cts.rts_mop_id) {
400            sq = sqt;
401            break;
402        }
403    }
404    if (!sq)
405        error("%s: mop_id %llx in CTS message not found", __func__,
406          llu(mh_cts.rts_mop_id));
407
408    debug(2, "%s: sq %p %s mopid %llx len %u", __func__, sq,
409          sq_state_name(sq->state.send), llu(mh_cts.rts_mop_id), byte_len);
410    bmi_ib_assert(sq->state.send == SQ_WAITING_CTS ||
411                  sq->state.send == SQ_WAITING_RTS_SEND_COMPLETION,
412                  "%s: wrong send state %s", __func__,
413                  sq_state_name(sq->state.send));
414
415    /* message; cts content; list of buffers, lengths, and keys */
416    want = sizeof(mh_cts)
417         + mh_cts.buflist_num * MSG_HEADER_CTS_BUFLIST_ENTRY_SIZE;
418    if (bmi_ib_unlikely(byte_len != want))
419        error("%s: wrong message size for CTS, got %u, want %u", __func__,
420              byte_len, want);
421
422    /* start the big tranfser */
423    post_sr_rdmaw(sq, &mh_cts, (msg_header_cts_t *) bh->buf + 1);
424
425    /* re-post our recv buf now that we have all the information from CTS */
426    post_rr(sq->c, bh);
427
428    if (sq->state.send == SQ_WAITING_CTS)
429        sq->state.send = SQ_WAITING_DATA_SEND_COMPLETION;
430    else
431        sq->state.send = SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS;
432    debug(3, "%s: sq %p now %s", __func__, sq, sq_state_name(sq->state.send));
433}
434
435
436/*
437 * See if anything was preposted that matches this.
438 */
439static struct ib_work *
440find_matching_recv(rq_state_t statemask, const ib_connection_t *c,
441  bmi_msg_tag_t bmi_tag)
442{
443    struct ib_work *rq;
444
445    qlist_for_each_entry(rq, &ib_device->recvq, list) {
446        if ((rq->state.recv & statemask) && rq->c == c
447            && rq->bmi_tag == bmi_tag)
448            return rq;
449    }
450    return NULL;
451}
452
453/*
454 * Init a new recvq entry from something that arrived on the wire.
455 */
456static struct ib_work *
457alloc_new_recv(ib_connection_t *c, struct buf_head *bh)
458{
459    struct ib_work *rq = bmi_ib_malloc(sizeof(*rq));
460    rq->type = BMI_RECV;
461    rq->c = c;
462    ++rq->c->refcnt;
463    rq->bh = bh;
464    rq->mop = 0;  /* until user posts for it */
465    rq->rts_mop_id = 0;
466    qlist_add_tail(&rq->list, &ib_device->recvq);
467    return rq;
468}
469
470/*
471 * Called from incoming message processing, except for the case
472 * of ack to a CTS, for which we know the rq (see below).
473 *
474 * Unexpected receive, either no post or explicit sendunexpected.
475 */
476static void
477encourage_recv_incoming(struct buf_head *bh, msg_type_t type, u_int32_t byte_len)
478{
479    ib_connection_t *c = bh->c;
480    struct ib_work *rq;
481    char *ptr = bh->buf;
482
483    debug(4, "%s: incoming msg type %s", __func__, msg_type_name(type));
484
485    if (type == MSG_EAGER_SEND) {
486
487        msg_header_eager_t mh_eager;
488
489        ptr = bh->buf;
490        decode_msg_header_eager_t(&ptr, &mh_eager);
491
492        debug(2, "%s: recv eager len %u", __func__, byte_len);
493
494        rq = find_matching_recv(RQ_WAITING_INCOMING, c, mh_eager.bmi_tag);
495        if (rq) {
496            bmi_size_t len = byte_len - sizeof(mh_eager);
497            if (len > rq->buflist.tot_len)
498                error("%s: EAGER received %lld too small for buffer %lld",
499                  __func__, lld(len), lld(rq->buflist.tot_len));
500
501            memcpy_to_buflist(&rq->buflist,
502                              (msg_header_eager_t *) bh->buf + 1,
503                              len);
504
505            /* re-post */
506            post_rr(c, bh);
507
508            rq->state.recv = RQ_EAGER_WAITING_USER_TEST;
509            debug(2, "%s: matched rq %p now %s", __func__, rq,
510              rq_state_name(rq->state.recv));
511#if MEMCACHE_EARLY_REG
512            /* if a big receive was posted but only a small message came
513             * through, unregister it now */
514            if (rq->buflist.tot_len > ib_device->eager_buf_payload) {
515                debug(2, "%s: early registration not needed, dereg after eager",
516                  __func__);
517                memcache_deregister(ib_device->memcache, &rq->buflist);
518            }
519#endif
520
521        } else {
522            rq = alloc_new_recv(c, bh);
523            /* return value for when user does post_recv for this one */
524            rq->bmi_tag = mh_eager.bmi_tag;
525            rq->state.recv = RQ_EAGER_WAITING_USER_POST;
526            /* do not repost or ack, keeping bh until user test */
527            debug(2, "%s: new rq %p now %s", __func__, rq,
528              rq_state_name(rq->state.recv));
529        }
530        rq->actual_len = byte_len - sizeof(mh_eager);
531
532    } else if (type == MSG_EAGER_SENDUNEXPECTED) {
533
534        msg_header_eager_t mh_eager;
535
536        ptr = bh->buf;
537        decode_msg_header_eager_t(&ptr, &mh_eager);
538
539        debug(2, "%s: recv eager unexpected len %u", __func__, byte_len);
540
541        rq = alloc_new_recv(c, bh);
542        /* return values for when user does testunexpected for this one */
543        rq->bmi_tag = mh_eager.bmi_tag;
544        rq->state.recv = RQ_EAGER_WAITING_USER_TESTUNEXPECTED;
545        rq->actual_len = byte_len - sizeof(mh_eager);
546        /* do not repost, keeping bh until user test */
547        debug(2, "%s: new rq %p now %s", __func__, rq,
548          rq_state_name(rq->state.recv));
549
550    } else if (type == MSG_RTS) {
551        /*
552         * Sender wants to send a big message, initiates rts/cts protocol.
553         * Has the user posted a matching receive for it yet?
554         */
555        msg_header_rts_t mh_rts;
556
557        ptr = bh->buf;
558        decode_msg_header_rts_t(&ptr, &mh_rts);
559
560        debug(2, "%s: recv RTS len %lld mopid %llx", __func__,
561              lld(mh_rts.tot_len), llu(mh_rts.mop_id));
562
563        rq = find_matching_recv(RQ_WAITING_INCOMING, c, mh_rts.bmi_tag);
564        if (rq) {
565            if ((int)mh_rts.tot_len > rq->buflist.tot_len) {
566                error("%s: RTS received %llu too small for buffer %llu",
567                  __func__, llu(mh_rts.tot_len), llu(rq->buflist.tot_len));
568            }
569            rq->state.recv = RQ_RTS_WAITING_CTS_BUFFER;
570            debug(2, "%s: matched rq %p MSG_RTS now %s", __func__, rq,
571              rq_state_name(rq->state.recv));
572        } else {
573            rq = alloc_new_recv(c, bh);
574            /* return value for when user does post_recv for this one */
575            rq->bmi_tag = mh_rts.bmi_tag;
576            rq->state.recv = RQ_RTS_WAITING_USER_POST;
577            debug(2, "%s: new rq %p MSG_RTS now %s", __func__, rq,
578              rq_state_name(rq->state.recv));
579        }
580        rq->actual_len = mh_rts.tot_len;
581        rq->rts_mop_id = mh_rts.mop_id;
582
583        post_rr(c, bh);
584
585        if (rq->state.recv == RQ_RTS_WAITING_CTS_BUFFER) {
586            int ret;
587            ret = send_cts(rq);
588            if (ret == 0)
589                rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
590            /* else keep waiting until we can send that cts */
591        }
592
593    } else if (type == MSG_RTS_DONE) {
594
595        msg_header_rts_done_t mh_rts_done;
596        struct ib_work *rqt;
597
598        ptr = bh->buf;
599        decode_msg_header_rts_done_t(&ptr, &mh_rts_done);
600
601        debug(2, "%s: recv RTS_DONE mop_id %llx", __func__,
602              llu(mh_rts_done.mop_id));
603
604        rq = NULL;
605        qlist_for_each_entry(rqt, &ib_device->recvq, list) {
606            if (rqt->c == c && rqt->rts_mop_id == mh_rts_done.mop_id &&
607                rqt->state.recv == RQ_RTS_WAITING_RTS_DONE) {
608                rq = rqt;
609                break;
610            }
611        }
612
613        bmi_ib_assert(rq, "%s: mop_id %llx in RTS_DONE message not found",
614                      __func__, llu(mh_rts_done.mop_id));
615
616#if MEMCACHE_BOUNCEBUF
617        memcpy_to_buflist(&rq->buflist, reg_recv_buflist_buf,
618                          rq->buflist.tot_len);
619#else
620        memcache_deregister(ib_device->memcache, &rq->buflist);
621#endif
622
623        post_rr(c, bh);
624
625        rq->state.recv = RQ_RTS_WAITING_USER_TEST;
626
627    } else if (type == MSG_BYE) {
628        /*
629         * Other side requests connection close.  Do it.
630         */
631        debug(2, "%s: recv BYE", __func__);
632        post_rr(c, bh);
633        ib_close_connection(c);
634
635    } else if (type == MSG_CREDIT) {
636
637        /* already added the credit in check_cq */
638        debug(2, "%s: recv CREDIT", __func__);
639        post_rr(c, bh);
640
641    } else {
642        error("%s: unknown message header type %d len %u", __func__,
643              type, byte_len);
644    }
645}
646
647/*
648 * We finished the RDMA write.  Send him a done message.
649 */
650static void encourage_rts_done_waiting_buffer(struct ib_work *sq)
651{
652    ib_connection_t *c = sq->c;
653    struct buf_head *bh;
654    char *ptr;
655    msg_header_rts_done_t mh_rts_done;
656
657    bh = get_eager_buf(c);
658    if (!bh) {
659        debug(2, "%s: sq %p no free send buffers to %s",
660              __func__, sq, c->peername);
661        return;
662    }
663    sq->bh = bh;
664    bh->sq = sq;
665    ptr = bh->buf;
666
667    msg_header_init(&mh_rts_done.c, c, MSG_RTS_DONE);
668    mh_rts_done.mop_id = sq->mop->op_id;
669
670    debug(2, "%s: sq %p sent RTS_DONE mopid %llx", __func__,
671          sq, llu(sq->mop->op_id));
672
673    encode_msg_header_rts_done_t(&ptr, &mh_rts_done);
674
675    post_sr(bh, sizeof(mh_rts_done));
676    sq->state.send = SQ_WAITING_RTS_DONE_SEND_COMPLETION;
677}
678
679static void send_bye(ib_connection_t *c)
680{
681    msg_header_common_t mh_common;
682    struct buf_head *bh;
683    char *ptr;
684
685    debug(2, "%s: sending bye", __func__);
686    bh = get_eager_buf(c);
687    if (!bh) {
688        debug(2, "%s: no free send buffers to %s", __func__, c->peername);
689        /* if no messages available, let garbage collection on server deal */
690        return;
691    }
692    bh->sq = NULL;
693    ptr = bh->buf;
694    msg_header_init(&mh_common, c, MSG_BYE);
695    encode_msg_header_common_t(&ptr, &mh_common);
696
697    post_sr(bh, sizeof(mh_common));
698}
699
700/*
701 * Two places need to send a CTS in response to an RTS.  They both
702 * call this.  This handles pinning the memory, too.  Don't forget
703 * to unpin when done.
704 */
705static int
706send_cts(struct ib_work *rq)
707{
708    ib_connection_t *c = rq->c;
709    struct buf_head *bh;
710    msg_header_cts_t mh_cts;
711    u_int64_t *bufp;
712    u_int32_t *lenp;
713    u_int32_t *keyp;
714    u_int32_t post_len;
715    char *ptr;
716    int i;
717
718    debug(2, "%s: rq %p from %s mopid %llx len %lld", __func__,
719          rq, rq->c->peername, llu(rq->rts_mop_id),
720          lld(rq->buflist.tot_len));
721
722    bh = get_eager_buf(c);
723    if (!bh) {
724        debug(2, "%s: rq %p no free send buffers to %s",
725              __func__, rq, c->peername);
726        return 1;
727    }
728    rq->bh = bh;
729    bh->sq = (struct ib_work *) rq;  /* uplink for completion */
730
731#if MEMCACHE_BOUNCEBUF
732    if (reg_recv_buflist.num == 0) {
733        reg_recv_buflist.num = 1;
734        reg_recv_buflist.buf.recv = &reg_recv_buflist_buf;
735        reg_recv_buflist.len = &reg_recv_buflist_len;
736        reg_recv_buflist.tot_len = reg_recv_buflist_len;
737        reg_recv_buflist_buf = bmi_ib_malloc(reg_recv_buflist_len);
738        memcache_register(ib_device->memcache, &reg_recv_buflist);
739    }
740    if (rq->buflist.tot_len > reg_recv_buflist_len)
741        error("%s: recv prereg buflist too small, need %lld", __func__,
742          lld(rq->buflist.tot_len));
743
744    ib_buflist_t save_buflist = rq->buflist;
745    rq->buflist = reg_recv_buflist;
746#else
747#  if !MEMCACHE_EARLY_REG
748    memcache_register(ib_device->memcache, &rq->buflist);
749#  endif
750#endif
751
752    msg_header_init(&mh_cts.c, c, MSG_CTS);
753    mh_cts.rts_mop_id = rq->rts_mop_id;
754    mh_cts.buflist_tot_len = rq->buflist.tot_len;
755    mh_cts.buflist_num = rq->buflist.num;
756
757    ptr = bh->buf;
758    encode_msg_header_cts_t(&ptr, &mh_cts);
759
760    /* encode all the buflist entries */
761    bufp = (u_int64_t *)((msg_header_cts_t *) bh->buf + 1);
762    lenp = (u_int32_t *)(bufp + rq->buflist.num);
763    keyp = (u_int32_t *)(lenp + rq->buflist.num);
764    post_len = (char *)(keyp + rq->buflist.num) - (char *)bh->buf;
765    if (post_len > ib_device->eager_buf_size)
766        error("%s: too many (%d) recv buflist entries for buf", __func__,
767          rq->buflist.num);
768    for (i=0; i<rq->buflist.num; i++) {
769        bufp[i] = htobmi64(int64_from_ptr(rq->buflist.buf.recv[i]));
770        lenp[i] = htobmi32(rq->buflist.len[i]);
771        keyp[i] = htobmi32(rq->buflist.memcache[i]->memkeys.rkey);
772    }
773
774    /* send the cts */
775    post_sr(bh, post_len);
776
777#if MEMCACHE_BOUNCEBUF
778    rq->buflist = save_buflist;
779#endif
780
781    return 0;
782}
783
784/*
785 * Bring up the connection before posting a send or receive on it.
786 */
787static int
788ensure_connected(struct bmi_method_addr *remote_map)
789{
790    int ret = 0;
791    ib_method_addr_t *ibmap = remote_map->method_data;
792
793    if (!ibmap->c && ibmap->reconnect_flag)
794        ret = ib_tcp_client_connect(ibmap, remote_map);
795    else if(!ibmap->c && !ibmap->reconnect_flag)
796        ret = 1; /* cannot actively connect */
797    else
798        ret = 0;
799
800    return ret;
801}
802
803/*
804 * Generic interface for both send and sendunexpected, list and non-list send.
805 */
806static int
807post_send(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
808          int numbufs, const void *const *buffers, const bmi_size_t *sizes,
809          bmi_size_t total_size, bmi_msg_tag_t tag, void *user_ptr,
810          bmi_context_id context_id, int is_unexpected)
811{
812    struct ib_work *sq;
813    struct method_op *mop;
814    ib_method_addr_t *ibmap;
815    int i;
816    int ret = 0;
817
818    gen_mutex_lock(&interface_mutex);
819    ret = ensure_connected(remote_map);
820    if (ret)
821        goto out;
822    ibmap = remote_map->method_data;
823
824    /* alloc and build new sendq structure */
825    sq = bmi_ib_malloc(sizeof(*sq));
826    sq->type = BMI_SEND;
827    sq->state.send = SQ_WAITING_BUFFER;
828
829    debug(2, "%s: sq %p len %lld peer %s", __func__, sq, (long long) total_size,
830          ibmap->c->peername);
831
832    /*
833     * For a single buffer, store it inside the sq directly, else save
834     * the pointer to the list the user built when calling a _list
835     * function.  This case is indicated by the non-_list functions by
836     * a zero in numbufs.
837     */
838    if (numbufs == 0) {
839        sq->buflist_one_buf.send = *buffers;
840        sq->buflist_one_len = *sizes;
841        sq->buflist.num = 1;
842        sq->buflist.buf.send = &sq->buflist_one_buf.send;
843        sq->buflist.len = &sq->buflist_one_len;
844    } else {
845        sq->buflist.num = numbufs;
846        sq->buflist.buf.send = buffers;
847        sq->buflist.len = sizes;
848    }
849    sq->buflist.tot_len = 0;
850    for (i=0; i<sq->buflist.num; i++)
851        sq->buflist.tot_len += sizes[i];
852
853    /*
854     * This passed-in total length field does not make much sense
855     * to me, but I'll at least check it for accuracy.
856     */
857    if (sq->buflist.tot_len != total_size)
858        error("%s: user-provided tot len %lld"
859          " does not match buffer list tot len %lld",
860          __func__, lld(total_size), lld(sq->buflist.tot_len));
861
862    /* unexpected messages must fit inside an eager message */
863    if (is_unexpected && sq->buflist.tot_len > ib_device->eager_buf_payload) {
864        free(sq);
865        ret = -EINVAL;
866        goto out;
867    }
868
869    sq->bmi_tag = tag;
870    sq->c = ibmap->c;
871    ++sq->c->refcnt;
872    sq->is_unexpected = is_unexpected;
873    qlist_add_tail(&sq->list, &ib_device->sendq);
874
875    /* generate identifier used by caller to test for message later */
876    mop = bmi_ib_malloc(sizeof(*mop));
877    id_gen_fast_register(&mop->op_id, mop);
878    mop->addr = remote_map;  /* set of function pointers, essentially */
879    mop->method_data = sq;
880    mop->user_ptr = user_ptr;
881    mop->context_id = context_id;
882    *id = mop->op_id;
883    sq->mop = mop;
884    debug(3, "%s: new sq %p", __func__, sq);
885
886    /* and start sending it if possible */
887    encourage_send_waiting_buffer(sq);
888  out:
889    gen_mutex_unlock(&interface_mutex);
890    return ret;
891}
892
893static int
894BMI_ib_post_send(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
895                 const void *buffer, bmi_size_t total_size,
896                 enum bmi_buffer_type buffer_flag __unused,
897                 bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
898{
899    return post_send(id, remote_map, 0, &buffer, &total_size,
900                     total_size, tag, user_ptr, context_id, 0);
901}
902
903static int
904BMI_ib_post_send_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
905  const void *const *buffers, const bmi_size_t *sizes, int list_count,
906  bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
907  bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id)
908{
909    return post_send(id, remote_map, list_count, buffers, sizes,
910                     total_size, tag, user_ptr, context_id, 0);
911}
912
913static int
914BMI_ib_post_sendunexpected(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
915                           const void *buffer, bmi_size_t total_size,
916                           enum bmi_buffer_type buffer_flag __unused,
917                           bmi_msg_tag_t tag, void *user_ptr,
918                           bmi_context_id context_id)
919{
920    return post_send(id, remote_map, 0, &buffer, &total_size,
921                     total_size, tag, user_ptr, context_id, 1);
922}
923
924static int
925BMI_ib_post_sendunexpected_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
926                                const void *const *buffers,
927                                const bmi_size_t *sizes, int list_count,
928                                bmi_size_t total_size,
929                                enum bmi_buffer_type buffer_flag __unused,
930                                bmi_msg_tag_t tag, void *user_ptr,
931                                bmi_context_id context_id)
932{
933    return post_send(id, remote_map, list_count, buffers, sizes,
934                     total_size, tag, user_ptr, context_id, 1);
935}
936
937/*
938 * Used by both recv and recv_list.
939 */
940static int
941post_recv(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
942          int numbufs, void *const *buffers, const bmi_size_t *sizes,
943          bmi_size_t tot_expected_len, bmi_msg_tag_t tag,
944          void *user_ptr, bmi_context_id context_id)
945{
946    struct ib_work *rq;
947    struct method_op *mop;
948    ib_method_addr_t *ibmap;
949    ib_connection_t *c;
950    int i;
951    int ret = 0;
952   
953    gen_mutex_lock(&interface_mutex);
954    ret = ensure_connected(remote_map);
955    if (ret)
956        goto out;
957    ibmap = remote_map->method_data;
958    c = ibmap->c;
959
960    /* poll interface first to save a few steps below */
961    ib_check_cq();
962
963    /* check to see if matching recv is in the queue */
964    rq = find_matching_recv(
965      RQ_EAGER_WAITING_USER_POST | RQ_RTS_WAITING_USER_POST, c, tag);
966    if (rq) {
967        debug(2, "%s: rq %p matches %s", __func__, rq,
968          rq_state_name(rq->state.recv));
969    } else {
970        /* alloc and build new recvq structure */
971        rq = alloc_new_recv(c, NULL);
972        rq->state.recv = RQ_WAITING_INCOMING;
973        rq->bmi_tag = tag;
974        debug(2, "%s: new rq %p", __func__, rq);
975    }
976
977    if (numbufs == 0) {
978        rq->buflist_one_buf.recv = *buffers;
979        rq->buflist_one_len = *sizes;
980        rq->buflist.num = 1;
981        rq->buflist.buf.recv = &rq->buflist_one_buf.recv;
982        rq->buflist.len = &rq->buflist_one_len;
983    } else {
984        rq->buflist.num = numbufs;
985        rq->buflist.buf.recv = buffers;
986        rq->buflist.len = sizes;
987    }
988    rq->buflist.tot_len = 0;
989    for (i=0; i<rq->buflist.num; i++)
990        rq->buflist.tot_len += sizes[i];
991
992    /*
993     * This passed-in total length field does not make much sense
994     * to me, but I'll at least check it for accuracy.
995     */
996    if (rq->buflist.tot_len != tot_expected_len)
997        error("%s: user-provided tot len %lld"
998          " does not match buffer list tot len %lld",
999          __func__, lld(tot_expected_len), lld(rq->buflist.tot_len));
1000
1001    /* generate identifier used by caller to test for message later */
1002    mop = bmi_ib_malloc(sizeof(*mop));
1003    id_gen_fast_register(&mop->op_id, mop);
1004    mop->addr = remote_map;  /* set of function pointers, essentially */
1005    mop->method_data = rq;
1006    mop->user_ptr = user_ptr;
1007    mop->context_id = context_id;
1008    *id = mop->op_id;
1009    rq->mop = mop;
1010
1011    /* handle the two "waiting for a local user post" states */
1012    if (rq->state.recv == RQ_EAGER_WAITING_USER_POST) {
1013
1014        msg_header_eager_t mh_eager;
1015        char *ptr = rq->bh->buf;
1016
1017        decode_msg_header_eager_t(&ptr, &mh_eager);
1018
1019        debug(2, "%s: rq %p state %s finish eager directly", __func__,
1020          rq, rq_state_name(rq->state.recv));
1021        if (rq->actual_len > tot_expected_len) {
1022            error("%s: received %lld matches too-small buffer %lld",
1023              __func__, lld(rq->actual_len), lld(rq->buflist.tot_len));
1024        }
1025
1026        memcpy_to_buflist(&rq->buflist,
1027                          (msg_header_eager_t *) rq->bh->buf + 1,
1028                          rq->actual_len);
1029
1030        /* re-post */
1031        post_rr(rq->c, rq->bh);
1032
1033        /* now just wait for user to test, never do "immediate completion" */
1034        rq->state.recv = RQ_EAGER_WAITING_USER_TEST;
1035        goto out;
1036
1037    } else if (rq->state.recv == RQ_RTS_WAITING_USER_POST) {
1038        int sret;
1039        debug(2, "%s: rq %p %s send cts", __func__, rq,
1040          rq_state_name(rq->state.recv));
1041        /* try to send, or wait for send buffer space */
1042        rq->state.recv = RQ_RTS_WAITING_CTS_BUFFER;
1043#if MEMCACHE_EARLY_REG
1044        memcache_register(ib_device->memcache, &rq->buflist);
1045#endif
1046        sret = send_cts(rq);
1047        if (sret == 0)
1048            rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
1049        goto out;
1050    }
1051
1052#if MEMCACHE_EARLY_REG
1053    /* but remember that this might not be used if the other side sends
1054     * less than we posted for receive; that's legal */
1055    if (rq->buflist.tot_len > ib_device->eager_buf_payload)
1056        memcache_register(ib_device->memcache, &rq->buflist);
1057#endif
1058
1059  out:
1060    gen_mutex_unlock(&interface_mutex);
1061    return ret;
1062}
1063
1064static int
1065BMI_ib_post_recv(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1066  void *buffer, bmi_size_t expected_len, bmi_size_t *actual_len __unused,
1067  enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
1068  bmi_context_id context_id)
1069{
1070    return post_recv(id, remote_map, 0, &buffer, &expected_len,
1071                     expected_len, tag, user_ptr, context_id);
1072}
1073
1074static int
1075BMI_ib_post_recv_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1076  void *const *buffers, const bmi_size_t *sizes, int list_count,
1077  bmi_size_t tot_expected_len, bmi_size_t *tot_actual_len __unused,
1078  enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
1079  bmi_context_id context_id)
1080{
1081    return post_recv(id, remote_map, list_count, buffers, sizes,
1082                     tot_expected_len, tag, user_ptr, context_id);
1083}
1084
1085/*
1086 * Internal shared helper function.  Return 1 if found something
1087 * completed.
1088 */
1089static int
1090test_sq(struct ib_work *sq, bmi_op_id_t *outid, bmi_error_code_t *err,
1091  bmi_size_t *size, void **user_ptr, int complete)
1092{
1093    ib_connection_t *c;
1094
1095    debug(9, "%s: sq %p outid %p err %p size %p user_ptr %p complete %d",
1096      __func__, sq, outid, err, size, user_ptr, complete);
1097
1098    if (sq->state.send == SQ_WAITING_USER_TEST) {
1099        if (complete) {
1100            debug(2, "%s: sq %p completed %lld to %s", __func__,
1101              sq, lld(sq->buflist.tot_len), sq->c->peername);
1102            *outid = sq->mop->op_id;
1103            *err = 0;
1104            *size = sq->buflist.tot_len;
1105            if (user_ptr)
1106                *user_ptr = sq->mop->user_ptr;
1107            qlist_del(&sq->list);
1108            id_gen_fast_unregister(sq->mop->op_id);
1109            c = sq->c;
1110            free(sq->mop);
1111            free(sq);
1112            --c->refcnt;
1113            if (c->closed || c->cancelled)
1114                ib_close_connection(c);
1115            return 1;
1116        }
1117    /* this state needs help, push it (ideally would be triggered
1118     * when the resource is freed... XXX */
1119    } else if (sq->state.send == SQ_WAITING_BUFFER) {
1120        debug(2, "%s: sq %p %s, encouraging", __func__, sq,
1121          sq_state_name(sq->state.send));
1122        encourage_send_waiting_buffer(sq);
1123    } else if (sq->state.send == SQ_WAITING_RTS_DONE_BUFFER) {
1124        debug(2, "%s: sq %p %s, encouraging", __func__, sq,
1125          sq_state_name(sq->state.send));
1126        encourage_rts_done_waiting_buffer(sq);
1127    } else if (sq->state.send == SQ_CANCELLED && complete) {
1128        debug(2, "%s: sq %p cancelled", __func__, sq);
1129        *outid = sq->mop->op_id;
1130        *err = -PVFS_ETIMEDOUT;
1131        if (user_ptr)
1132            *user_ptr = sq->mop->user_ptr;
1133        qlist_del(&sq->list);
1134        id_gen_fast_unregister(sq->mop->op_id);
1135        c = sq->c;
1136        free(sq->mop);
1137        free(sq);
1138        --c->refcnt;
1139        if (c->closed || c->cancelled)
1140            ib_close_connection(c);
1141        return 1;
1142    } else {
1143        debug(9, "%s: sq %p found, not done, state %s", __func__,
1144          sq, sq_state_name(sq->state.send));
1145    }
1146    return 0;
1147}
1148
1149/*
1150 * Internal shared helper function.  Return 1 if found something
1151 * completed.  Note that rq->mop can be null for unexpected
1152 * messages.
1153 */
1154static int
1155test_rq(struct ib_work *rq, bmi_op_id_t *outid, bmi_error_code_t *err,
1156  bmi_size_t *size, void **user_ptr, int complete)
1157{
1158    ib_connection_t *c;
1159
1160    debug(9, "%s: rq %p outid %p err %p size %p user_ptr %p complete %d",
1161      __func__, rq, outid, err, size, user_ptr, complete);
1162
1163    if (rq->state.recv == RQ_EAGER_WAITING_USER_TEST
1164      || rq->state.recv == RQ_RTS_WAITING_USER_TEST) {
1165        if (complete) {
1166            debug(2, "%s: rq %p completed %lld from %s", __func__,
1167              rq, lld(rq->actual_len), rq->c->peername);
1168            *err = 0;
1169            *size = rq->actual_len;
1170            if (rq->mop) {
1171                *outid = rq->mop->op_id;
1172                if (user_ptr)
1173                    *user_ptr = rq->mop->user_ptr;
1174                id_gen_fast_unregister(rq->mop->op_id);
1175                free(rq->mop);
1176            }
1177            qlist_del(&rq->list);
1178            c = rq->c;
1179            free(rq);
1180            --c->refcnt;
1181            if (c->closed || c->cancelled)
1182                ib_close_connection(c);
1183            return 1;
1184        }
1185    /* this state needs help, push it (ideally would be triggered
1186     * when the resource is freed...) XXX */
1187    } else if (rq->state.recv == RQ_RTS_WAITING_CTS_BUFFER) {
1188        int ret;
1189        debug(2, "%s: rq %p %s, encouraging", __func__, rq,
1190          rq_state_name(rq->state.recv));
1191        ret = send_cts(rq);
1192        if (ret == 0)
1193            rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
1194        /* else keep waiting until we can send that cts */
1195        debug(2, "%s: rq %p now %s", __func__, rq, rq_state_name(rq->state.recv));
1196    } else if (rq->state.recv == RQ_CANCELLED && complete) {
1197        debug(2, "%s: rq %p cancelled", __func__, rq);
1198        *err = -PVFS_ETIMEDOUT;
1199        if (rq->mop) {
1200            *outid = rq->mop->op_id;
1201            if (user_ptr)
1202                *user_ptr = rq->mop->user_ptr;
1203            id_gen_fast_unregister(rq->mop->op_id);
1204            free(rq->mop);
1205        }
1206        qlist_del(&rq->list);
1207        c = rq->c;
1208        free(rq);
1209        --c->refcnt;
1210        if (c->closed || c->cancelled)
1211            ib_close_connection(c);
1212        return 1;
1213    } else {
1214        debug(9, "%s: rq %p found, not done, state %s", __func__,
1215          rq, rq_state_name(rq->state.recv));
1216    }
1217    return 0;
1218}
1219
1220/*
1221 * Test one message, send or receive.  Also used to test the send side of
1222 * messages sent using sendunexpected.
1223 */
1224static int
1225BMI_ib_test(bmi_op_id_t id, int *outcount, bmi_error_code_t *err,
1226  bmi_size_t *size, void **user_ptr, int max_idle_time __unused,
1227  bmi_context_id context_id __unused)
1228{
1229    struct method_op *mop;
1230    struct ib_work *sq;
1231    int n;
1232
1233    gen_mutex_lock(&interface_mutex);
1234    ib_check_cq();
1235
1236    mop = id_gen_fast_lookup(id);
1237    sq = mop->method_data;
1238    n = 0;
1239    if (sq->type == BMI_SEND) {
1240        if (test_sq(sq, &id, err, size, user_ptr, 1))
1241            n = 1;
1242    } else {
1243        /* actually a recv */
1244        struct ib_work *rq = mop->method_data;
1245        if (test_rq(rq, &id, err, size, user_ptr, 1))
1246            n = 1;
1247    }
1248    *outcount = n;
1249    gen_mutex_unlock(&interface_mutex);
1250    return 0;
1251}
1252
1253/*
1254 * Test just the particular list of op ids, returning the list of indices
1255 * that completed.
1256 */
1257static int BMI_ib_testsome(int incount, bmi_op_id_t *ids, int *outcount,
1258  int *index_array, bmi_error_code_t *errs, bmi_size_t *sizes, void **user_ptrs,
1259  int max_idle_time __unused, bmi_context_id context_id __unused)
1260{
1261    struct method_op *mop;
1262    struct ib_work *sq;
1263    bmi_op_id_t tid;
1264    int i, n;
1265
1266    gen_mutex_lock(&interface_mutex);
1267    ib_check_cq();
1268
1269    n = 0;
1270    for (i=0; i<incount; i++) {
1271        if (!ids[i])
1272            continue;
1273        mop = id_gen_fast_lookup(ids[i]);
1274        sq = mop->method_data;
1275
1276        if (sq->type == BMI_SEND) {
1277            if (test_sq(sq, &tid, &errs[n], &sizes[n], &user_ptrs[n], 1)) {
1278                index_array[n] = i;
1279                ++n;
1280            }
1281        } else {
1282            /* actually a recv */
1283            struct ib_work *rq = mop->method_data;
1284            if (test_rq(rq, &tid, &errs[n], &sizes[n], &user_ptrs[n], 1)) {
1285                index_array[n] = i;
1286                ++n;
1287            }
1288        }
1289    }
1290
1291    gen_mutex_unlock(&interface_mutex);
1292
1293    *outcount = n;
1294    return 0;
1295}
1296
1297/*
1298 * Test for multiple completions matching a particular user context.
1299 * Return 0 if okay, >0 if want another poll soon, negative for error.
1300 */
1301static int
1302BMI_ib_testcontext(int incount, bmi_op_id_t *outids, int *outcount,
1303  bmi_error_code_t *errs, bmi_size_t *sizes, void **user_ptrs,
1304  int max_idle_time, bmi_context_id context_id)
1305{
1306    struct qlist_head *l, *lnext;
1307    int n = 0, complete, activity = 0;
1308    void **up = NULL;
1309
1310    gen_mutex_lock(&interface_mutex);
1311
1312restart:
1313    activity += ib_check_cq();
1314
1315    /*
1316     * Walk _all_ entries on sq, rq, marking them completed or
1317     * encouraging them as needed due to resource limitations.
1318     */
1319    for (l=ib_device->sendq.next; l != &ib_device->sendq; l=lnext) {
1320        struct ib_work *sq = qlist_upcast(l);
1321        lnext = l->next;
1322        /* test them all, even if can't reap them, just to encourage */
1323        complete = (sq->mop->context_id == context_id) && (n < incount);
1324        if (user_ptrs)
1325            up = &user_ptrs[n];
1326        n += test_sq(sq, &outids[n], &errs[n], &sizes[n], up, complete);
1327    }
1328
1329    for (l=ib_device->recvq.next; l != &ib_device->recvq; l=lnext) {
1330        struct ib_work *rq = qlist_upcast(l);
1331        lnext = l->next;
1332
1333        /* some receives have no mops:  unexpected */
1334        complete = rq->mop &&
1335          (rq->mop->context_id == context_id) && (n < incount);
1336        if (user_ptrs)
1337            up = &user_ptrs[n];
1338        n += test_rq(rq, &outids[n], &errs[n], &sizes[n], up, complete);
1339    }
1340
1341    /* drop lock before blocking on new connections below */
1342    gen_mutex_unlock(&interface_mutex);
1343
1344    if (activity == 0 && n == 0 && max_idle_time > 0) {
1345        /*
1346         * Block if told to from above.
1347         */
1348        debug(8, "%s: last activity too long ago, blocking", __func__);
1349        activity = ib_block_for_activity(max_idle_time);
1350        if (activity == 1) {   /* IB action, go do it immediately */
1351            gen_mutex_lock(&interface_mutex);
1352            goto restart;
1353        }
1354    }
1355
1356    *outcount = n;
1357    return activity + n;
1358}
1359
1360/*
1361 * Non-blocking test to look for any incoming unexpected messages.
1362 * This is also where we check for new connections on the TCP socket, since
1363 * those would show up as unexpected the first time anything is sent.
1364 * Return 0 for success, or -1 for failure; number of things in *outcount.
1365 * Return >0 if want another poll soon.
1366 */
1367static int
1368BMI_ib_testunexpected(int incount __unused, int *outcount,
1369  struct bmi_method_unexpected_info *ui, int max_idle_time)
1370{
1371    struct qlist_head *l;
1372    int activity = 0, n;
1373
1374    gen_mutex_lock(&interface_mutex);
1375
1376    /* Check CQ, then look for the first unexpected message.  */
1377restart:
1378    activity += ib_check_cq();
1379
1380    n = 0;
1381    qlist_for_each(l, &ib_device->recvq) {
1382        struct ib_work *rq = qlist_upcast(l);
1383        if (rq->state.recv == RQ_EAGER_WAITING_USER_TESTUNEXPECTED) {
1384            msg_header_eager_t mh_eager;
1385            char *ptr = rq->bh->buf;
1386            ib_connection_t *c = rq->c;
1387
1388            decode_msg_header_eager_t(&ptr, &mh_eager);
1389
1390            debug(2, "%s: found waiting testunexpected", __func__);
1391            ui->error_code = 0;
1392            ui->addr = c->remote_map;  /* hand back permanent method_addr */
1393            ui->buffer = bmi_ib_malloc((unsigned long) rq->actual_len);
1394            ui->size = rq->actual_len;
1395            memcpy(ui->buffer,
1396                   (msg_header_eager_t *) rq->bh->buf + 1,
1397                   (size_t) ui->size);
1398            ui->tag = rq->bmi_tag;
1399            /* re-post the buffer in which it was sitting, just unexpecteds */
1400            post_rr(c, rq->bh);
1401            n = 1;
1402            qlist_del(&rq->list);
1403            free(rq);
1404            --c->refcnt;
1405            if (c->closed || c->cancelled)
1406                ib_close_connection(c);
1407            goto out;
1408        }
1409    }
1410
1411  out:
1412    gen_mutex_unlock(&interface_mutex);
1413
1414    if (activity == 0 && n == 0 && max_idle_time > 0) {
1415        /*
1416         * Block if told to from above, also polls TCP listening socket.
1417         */
1418        debug(8, "%s: last activity too long ago, blocking", __func__);
1419        activity = ib_block_for_activity(max_idle_time);
1420        if (activity == 1) {   /* IB action, go do it immediately */
1421            gen_mutex_lock(&interface_mutex);
1422            goto restart;
1423        }
1424    }
1425
1426    *outcount = n;
1427    return activity + n;
1428}
1429
1430/*
1431 * No need to track these internally.  Just search the entire queue.
1432 */
1433static int
1434BMI_ib_open_context(bmi_context_id context_id __unused)
1435{
1436    return 0;
1437}
1438
1439static void
1440BMI_ib_close_context(bmi_context_id context_id __unused)
1441{
1442}
1443
1444/*
1445 * Asynchronous call to destroy an in-progress operation.
1446 * Can't just call test since we don't want to reap the operation,
1447 * just make sure it's done or not.
1448 */
1449static int
1450BMI_ib_cancel(bmi_op_id_t id, bmi_context_id context_id __unused)
1451{
1452    struct method_op *mop;
1453    struct ib_work *tsq;
1454    ib_connection_t *c = 0;
1455
1456    gen_mutex_lock(&interface_mutex);
1457    ib_check_cq();
1458    mop = id_gen_fast_lookup(id);
1459    tsq = mop->method_data;
1460    if (tsq->type == BMI_SEND) {
1461        /*
1462         * Cancelling completed operations is fine, they will be
1463         * tested later.  Any others trigger full shutdown of the
1464         * connection.
1465         */
1466        if (tsq->state.send != SQ_WAITING_USER_TEST)
1467            c = tsq->c;
1468    } else {
1469        /* actually a recv */
1470        struct ib_work *rq = mop->method_data;
1471        if (!(rq->state.recv == RQ_EAGER_WAITING_USER_TEST
1472           || rq->state.recv == RQ_RTS_WAITING_USER_TEST))
1473            c = rq->c;
1474    }
1475
1476    if (c && !c->cancelled) {
1477        /*
1478         * In response to a cancel, forcibly close the connection.  Don't send
1479         * a bye message first since it may be the case that the peer is dead
1480         * anyway.  Do not close the connection until all the sq/rq on it have
1481         * gone away.
1482         */
1483        struct qlist_head *l;
1484
1485        c->cancelled = 1;
1486        drain_qp(c);
1487        qlist_for_each(l, &ib_device->sendq) {
1488            struct ib_work *sq = qlist_upcast(l);
1489            if (sq->c != c) continue;
1490#if !MEMCACHE_BOUNCEBUF
1491            if (sq->state.send == SQ_WAITING_DATA_SEND_COMPLETION)
1492                memcache_deregister(ib_device->memcache, &sq->buflist);
1493#  if MEMCACHE_EARLY_REG
1494            /* pin when sending rts, so also must dereg in this state */
1495            if (sq->state.send == SQ_WAITING_RTS_SEND_COMPLETION ||
1496                sq->state.send == SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS ||
1497                sq->state.send == SQ_WAITING_CTS)
1498                memcache_deregister(ib_device->memcache, &sq->buflist);
1499#  endif
1500#endif
1501            if (sq->state.send != SQ_WAITING_USER_TEST)
1502                sq->state.send = SQ_CANCELLED;
1503        }
1504        qlist_for_each(l, &ib_device->recvq) {
1505            struct ib_work *rq = qlist_upcast(l);
1506            if (rq->c != c) continue;
1507#if !MEMCACHE_BOUNCEBUF
1508            if (rq->state.recv == RQ_RTS_WAITING_RTS_DONE)
1509                memcache_deregister(ib_device->memcache, &rq->buflist);
1510#  if MEMCACHE_EARLY_REG
1511            /* pin on post, dereg all these */
1512            if (rq->state.recv == RQ_RTS_WAITING_CTS_SEND_COMPLETION)
1513                memcache_deregister(ib_device->memcache, &rq->buflist);
1514            if (rq->state.recv == RQ_WAITING_INCOMING
1515              && rq->buflist.tot_len > ib_device->eager_buf_payload)
1516                memcache_deregister(ib_device->memcache, &rq->buflist);
1517#  endif
1518#endif
1519            if (!(rq->state.recv == RQ_EAGER_WAITING_USER_TEST
1520               || rq->state.recv == RQ_RTS_WAITING_USER_TEST))
1521                rq->state.recv = RQ_CANCELLED;
1522        }
1523    }
1524
1525    gen_mutex_unlock(&interface_mutex);
1526    return 0;
1527}
1528
1529static const char *
1530BMI_ib_rev_lookup(struct bmi_method_addr *meth)
1531{
1532    ib_method_addr_t *ibmap = meth->method_data;
1533    if (!ibmap->c)
1534        return "(unconnected)";
1535    else
1536        return ibmap->c->peername;
1537}
1538
1539/*
1540 * Build and fill an IB-specific method_addr structure.
1541 */
1542static struct bmi_method_addr *ib_alloc_method_addr(ib_connection_t *c,
1543                                                char *hostname, int port,
1544                                                int reconnect_flag)
1545{
1546    struct bmi_method_addr *map;
1547    ib_method_addr_t *ibmap;
1548
1549    map = bmi_alloc_method_addr(bmi_ib_method_id, (bmi_size_t) sizeof(*ibmap));
1550    ibmap = map->method_data;
1551    ibmap->c = c;
1552    ibmap->hostname = hostname;
1553    ibmap->port = port;
1554    ibmap->reconnect_flag = reconnect_flag;
1555
1556    return map;
1557}
1558
1559/*
1560 * Break up a method string like:
1561 *   ib://hostname:port/filesystem
1562 * into its constituent fields, storing them in an opaque
1563 * type, which is then returned.
1564 * XXX: I'm assuming that these actually return a _const_ pointer
1565 * so that I can hand back an existing map.
1566 */
1567static struct bmi_method_addr *BMI_ib_method_addr_lookup(const char *id)
1568{
1569    char *s, *hostname, *cp, *cq;
1570    int port;
1571    struct bmi_method_addr *map = NULL;
1572
1573    /* parse hostname */
1574    s = string_key("ib", id);  /* allocs a string */
1575    if (!s)
1576        return 0;
1577    cp = strchr(s, ':');
1578    if (!cp)
1579        error("%s: no ':' found", __func__);
1580
1581    /* copy to permanent storage */
1582    hostname = bmi_ib_malloc((unsigned long) (cp - s + 1));
1583    strncpy(hostname, s, (size_t) (cp-s));
1584    hostname[cp-s] = '\0';
1585
1586    /* strip /filesystem  */
1587    ++cp;
1588    cq = strchr(cp, '/');
1589    if (cq)
1590        *cq = 0;
1591    port = strtoul(cp, &cq, 10);
1592    if (cq == cp)
1593        error("%s: invalid port number", __func__);
1594    if (*cq != '\0')
1595        error("%s: extra characters after port number", __func__);
1596    free(s);
1597
1598    /* lookup in known connections, if there are any */
1599    gen_mutex_lock(&interface_mutex);
1600    if (ib_device) {
1601        struct qlist_head *l;
1602        qlist_for_each(l, &ib_device->connection) {
1603            ib_connection_t *c = qlist_upcast(l);
1604            ib_method_addr_t *ibmap = c->remote_map->method_data;
1605            if (ibmap->port == port && !strcmp(ibmap->hostname, hostname)) {
1606               map = c->remote_map;
1607               break;
1608            }
1609        }
1610    }
1611    gen_mutex_unlock(&interface_mutex);
1612
1613    if (map)
1614        free(hostname);  /* found it */
1615    else
1616    {
1617        /* set reconnect flag on this addr; we will be acting as a client
1618         * for this connection and will be responsible for making sure that
1619         * the connection is established
1620         */
1621        map = ib_alloc_method_addr(0, hostname, port, 1);  /* alloc new one */
1622        /* but don't call bmi_method_addr_reg_callback! */
1623    }
1624
1625    return map;
1626}
1627
1628static ib_connection_t *ib_new_connection(int sock, const char *peername,
1629                                          int is_server)
1630{
1631    ib_connection_t *c;
1632    int i, ret;
1633
1634    c = bmi_ib_malloc(sizeof(*c));
1635    c->peername = strdup(peername);
1636
1637    /* fill send and recv free lists and buf heads */
1638    c->eager_send_buf_contig = bmi_ib_malloc(ib_device->eager_buf_num
1639      * ib_device->eager_buf_size);
1640    c->eager_recv_buf_contig = bmi_ib_malloc(ib_device->eager_buf_num
1641      * ib_device->eager_buf_size);
1642    INIT_QLIST_HEAD(&c->eager_send_buf_free);
1643    INIT_QLIST_HEAD(&c->eager_recv_buf_free);
1644    c->eager_send_buf_head_contig = bmi_ib_malloc(ib_device->eager_buf_num
1645      * sizeof(*c->eager_send_buf_head_contig));
1646    c->eager_recv_buf_head_contig = bmi_ib_malloc(ib_device->eager_buf_num
1647      * sizeof(*c->eager_recv_buf_head_contig));
1648    for (i=0; i<ib_device->eager_buf_num; i++) {
1649        struct buf_head *ebs = &c->eager_send_buf_head_contig[i];
1650        struct buf_head *ebr = &c->eager_recv_buf_head_contig[i];
1651        INIT_QLIST_HEAD(&ebs->list);
1652        INIT_QLIST_HEAD(&ebr->list);
1653        ebs->c = c;
1654        ebr->c = c;
1655        ebs->num = i;
1656        ebr->num = i;
1657        ebs->buf = (char *) c->eager_send_buf_contig
1658                 + i * ib_device->eager_buf_size;
1659        ebr->buf = (char *) c->eager_recv_buf_contig
1660                 + i * ib_device->eager_buf_size;
1661        qlist_add_tail(&ebs->list, &c->eager_send_buf_free);
1662        qlist_add_tail(&ebr->list, &c->eager_recv_buf_free);
1663    }
1664
1665    /* put it on the list */
1666    qlist_add(&c->list, &ib_device->connection);
1667
1668    /* other vars */
1669    c->remote_map = 0;
1670    c->cancelled = 0;
1671    c->refcnt = 0;
1672    c->closed = 0;
1673
1674    /* save one credit back for emergency credit refill */
1675    c->send_credit = ib_device->eager_buf_num - 1;
1676    c->return_credit = 0;
1677
1678    ret = new_connection(c, sock, is_server);
1679    if (ret) {
1680        ib_close_connection(c);
1681        c = NULL;
1682    }
1683
1684    return c;
1685}
1686
1687/*
1688 * Try to close and free a connection, but only do it if refcnt has
1689 * gone to zero.
1690 */
1691static void ib_close_connection(ib_connection_t *c)
1692{
1693    debug(2, "%s: closing connection to %s", __func__, c->peername);
1694    c->closed = 1;
1695    if (c->refcnt != 0) {
1696        debug(1, "%s: refcnt non-zero %d, delaying free", __func__, c->refcnt);
1697        return;
1698    }
1699
1700    close_connection(c);
1701
1702    free(c->eager_send_buf_contig);
1703    free(c->eager_recv_buf_contig);
1704    free(c->eager_send_buf_head_contig);
1705    free(c->eager_recv_buf_head_contig);
1706    /* never free the remote map, for the life of the executable, just
1707     * mark it unconnected since BMI will always have this structure. */
1708    if (c->remote_map) {
1709        ib_method_addr_t *ibmap = c->remote_map->method_data;
1710        ibmap->c = NULL;
1711    }
1712    free(c->peername);
1713    qlist_del(&c->list);
1714    free(c);
1715}
1716
1717/*
1718 * Blocking connect initiated by a post_sendunexpected{,_list}, or
1719 * post_recv*
1720 */
1721static int ib_tcp_client_connect(ib_method_addr_t *ibmap,
1722                                 struct bmi_method_addr *remote_map)
1723{
1724    int s;
1725    char peername[2048];
1726    struct hostent *hp;
1727    struct sockaddr_in skin;
1728   
1729    s = socket(AF_INET, SOCK_STREAM, 0);
1730    if (s < 0) {
1731        warning("%s: create tcp socket: %m", __func__);
1732        return bmi_errno_to_pvfs(-errno);
1733    }
1734    hp = gethostbyname(ibmap->hostname);
1735    if (!hp) {
1736        warning("%s: cannot resolve server %s", __func__, ibmap->hostname);
1737        return -1;
1738    }
1739    memset(&skin, 0, sizeof(skin));
1740    skin.sin_family = hp->h_addrtype;
1741    memcpy(&skin.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
1742    skin.sin_port = htons(ibmap->port);
1743    sprintf(peername, "%s:%d", ibmap->hostname, ibmap->port);
1744  retry:
1745    if (connect(s, (struct sockaddr *) &skin, sizeof(skin)) < 0) {
1746        if (errno == EINTR)
1747            goto retry;
1748        else {
1749            warning("%s: connect to server %s: %m", __func__, peername);
1750            return bmi_errno_to_pvfs(-errno);
1751        }
1752    }
1753    ibmap->c = ib_new_connection(s, peername, 0);
1754    if (!ibmap->c)
1755        error("%s: ib_new_connection failed", __func__);
1756    ibmap->c->remote_map = remote_map;
1757
1758    if (close(s) < 0) {
1759        warning("%s: close sock: %m", __func__);
1760        return bmi_errno_to_pvfs(-errno);
1761    }
1762    return 0;
1763}
1764
1765/*
1766 * On a server, initialize a socket for listening for new connections.
1767 */
1768static void ib_tcp_server_init_listen_socket(struct bmi_method_addr *addr)
1769{
1770    int flags;
1771    struct sockaddr_in skin;
1772    ib_method_addr_t *ibc = addr->method_data;
1773
1774    ib_device->listen_sock = socket(AF_INET, SOCK_STREAM, 0);
1775    if (ib_device->listen_sock < 0)
1776        error_errno("%s: create tcp socket", __func__);
1777    flags = 1;
1778    if (setsockopt(ib_device->listen_sock, SOL_SOCKET, SO_REUSEADDR, &flags,
1779      sizeof(flags)) < 0)
1780        error_errno("%s: setsockopt REUSEADDR", __func__);
1781    memset(&skin, 0, sizeof(skin));
1782    skin.sin_family = AF_INET;
1783    skin.sin_port = htons(ibc->port);
1784  retry:
1785    if (bind(ib_device->listen_sock, (struct sockaddr *) &skin, sizeof(skin)) < 0) {
1786        if (errno == EINTR)
1787            goto retry;
1788        else
1789            error_errno("%s: bind tcp socket", __func__);
1790    }
1791    if (listen(ib_device->listen_sock, 1024) < 0)
1792        error_errno("%s: listen tcp socket", __func__);
1793    flags = fcntl(ib_device->listen_sock, F_GETFL);
1794    if (flags < 0)
1795        error_errno("%s: fcntl getfl listen sock", __func__);
1796    flags |= O_NONBLOCK;
1797    if (fcntl(ib_device->listen_sock, F_SETFL, flags) < 0)
1798        error_errno("%s: fcntl setfl nonblock listen sock", __func__);
1799}
1800
1801/*
1802 * Check for new connections.  The listening socket is left nonblocking
1803 * so this test can be quick; but accept is not really that quick compared
1804 * to polling an IB interface, for instance.  Returns >0 if an accept worked.
1805 */
1806static int ib_tcp_server_check_new_connections(void)
1807{
1808    struct sockaddr_in ssin;
1809    socklen_t len;
1810    int s, ret = 0;
1811
1812    len = sizeof(ssin);
1813    s = accept(ib_device->listen_sock, (struct sockaddr *) &ssin, &len);
1814    if (s < 0) {
1815        if (!(errno == EAGAIN))
1816            error_errno("%s: accept listen sock", __func__);
1817    } else {
1818        char peername[2048];
1819        ib_connection_t *c;
1820
1821        char *hostname = strdup(inet_ntoa(ssin.sin_addr));
1822        int port = ntohs(ssin.sin_port);
1823        sprintf(peername, "%s:%d", hostname, port);
1824
1825        gen_mutex_lock(&interface_mutex);
1826
1827        c = ib_new_connection(s, peername, 1);
1828        if (!c) {
1829            free(hostname);
1830            goto out_unlock;
1831        }
1832
1833        /* don't set reconnect flag on this addr; we are a server in this
1834         * case and the peer will be responsible for maintaining the
1835         * connection
1836         */
1837        c->remote_map = ib_alloc_method_addr(c, hostname, port, 0);
1838        /* register this address with the method control layer */
1839        c->bmi_addr = bmi_method_addr_reg_callback(c->remote_map);
1840        if (c->bmi_addr == 0)
1841            error_xerrno(ENOMEM, "%s: bmi_method_addr_reg_callback", __func__);
1842
1843        debug(2, "%s: accepted new connection %s at server", __func__,
1844          c->peername);
1845        ret = 1;
1846
1847out_unlock:
1848        gen_mutex_unlock(&interface_mutex);
1849        if (close(s) < 0)
1850            error_errno("%s: close new sock", __func__);
1851    }
1852    return ret;
1853}
1854
1855/*
1856 * Ask the device to write to its FD if a CQ event happens, and poll on it
1857 * as well as the listen_sock for activity, but do not actually respond to
1858 * anything.  A later ib_check_cq will handle CQ events, and a later call to
1859 * testunexpected will pick up new connections.  Returns ==1 if IB device is
1860 * ready, other >0 for some activity, else 0.
1861 */
1862static int ib_block_for_activity(int timeout_ms)
1863{
1864    struct pollfd pfd[3];  /* cq fd, async fd, accept socket */
1865    int numfd;
1866    int ret;
1867
1868    prepare_cq_block(&pfd[0].fd, &pfd[1].fd);
1869    pfd[0].events = POLLIN;
1870    pfd[1].events = POLLIN;
1871    numfd = 2;
1872    if (ib_device->listen_sock >= 0) {
1873        pfd[2].fd = ib_device->listen_sock;
1874        pfd[2].events = POLLIN;
1875        numfd = 3;
1876    }
1877
1878    ret = poll(pfd, numfd, timeout_ms);
1879    debug(4, "%s: ret %d rev0 %x rev1 %x", __func__, ret,
1880          pfd[0].revents, pfd[1].revents);
1881    if (ret > 0) {
1882        if (pfd[0].revents == POLLIN) {
1883            ack_cq_completion_event();
1884            return 1;
1885        }
1886        /* check others only if CQ was empty */
1887        ret = 2;
1888        if (pfd[1].revents == POLLIN)
1889            check_async_events();
1890        if (pfd[2].revents == POLLIN)
1891            ib_tcp_server_check_new_connections();
1892    } else if (ret < 0) {
1893        if (errno == EINTR)  /* normal, ignore but break */
1894            ret = 0;
1895        else
1896            error_errno("%s: poll listen sock", __func__);
1897    }
1898    return ret;
1899}
1900
1901static void *BMI_ib_memalloc(bmi_size_t len,
1902                             enum bmi_op_type send_recv __unused)
1903{
1904    return memcache_memalloc(ib_device->memcache, len,
1905                             ib_device->eager_buf_payload);
1906}
1907
1908static int BMI_ib_memfree(void *buf, bmi_size_t len,
1909                          enum bmi_op_type send_recv __unused)
1910{
1911    return memcache_memfree(ib_device->memcache, buf, len);
1912}
1913
1914static int BMI_ib_unexpected_free(void *buf)
1915{
1916    free(buf);
1917    return 0;
1918}
1919
1920/*
1921 * Callers sometimes want to know odd pieces of information.  Satisfy
1922 * them.
1923 */
1924static int BMI_ib_get_info(int option, void *param)
1925{
1926    int ret = 0;
1927
1928    switch (option) {
1929        case BMI_CHECK_MAXSIZE:
1930            /* reality is 2^31, but shrink to avoid negative int */
1931            *(int *)param = (1UL << 31) - 1;
1932            break;
1933        case BMI_GET_UNEXP_SIZE:
1934            *(int *)param = ib_device->eager_buf_payload;
1935            break;
1936        default:
1937            ret = -ENOSYS;
1938    }
1939    return ret;
1940}
1941
1942/*
1943 * Used to set some optional parameters and random functions, like ioctl.
1944 */
1945static int BMI_ib_set_info(int option, void *param __unused)
1946{
1947    switch (option) {
1948    case BMI_DROP_ADDR: {
1949        struct bmi_method_addr *map = param;
1950        ib_method_addr_t *ibmap = map->method_data;
1951        free(ibmap->hostname);
1952        free(map);
1953        break;
1954    }
1955    case BMI_OPTIMISTIC_BUFFER_REG: {
1956        /* not guaranteed to work */
1957        const struct bmi_optimistic_buffer_info *binfo = param;
1958        memcache_preregister(ib_device->memcache, binfo->buffer,
1959                             binfo->len, binfo->rw);
1960        break;
1961    }
1962    default:
1963        /* Should return -ENOSYS, but return 0 for caller ease. */
1964        break;
1965    }
1966    return 0;
1967}
1968
1969#ifdef OPENIB
1970extern int openib_ib_initialize(void);
1971#endif
1972#ifdef VAPI
1973extern int vapi_ib_initialize(void);
1974#endif
1975
1976/*
1977 * Startup, once per application.
1978 */
1979static int BMI_ib_initialize(struct bmi_method_addr *listen_addr, int method_id,
1980                             int init_flags)
1981{
1982    int ret;
1983
1984    debug(0, "%s: init", __func__);
1985
1986    gen_mutex_lock(&interface_mutex);
1987
1988    /* check params */
1989    if (!!listen_addr ^ (init_flags & BMI_INIT_SERVER))
1990        error("%s: error: BMI_INIT_SERVER requires non-null listen_addr"
1991          " and v.v", __func__);
1992
1993    bmi_ib_method_id = method_id;
1994
1995    ib_device = bmi_ib_malloc(sizeof(*ib_device));
1996
1997    /* try, in order, OpenIB then VAPI; set up function pointers */
1998    ret = 1;
1999#ifdef OPENIB
2000    ret = openib_ib_initialize();
2001#endif
2002#ifdef VAPI
2003    if (ret)
2004        ret = vapi_ib_initialize();
2005#endif
2006    if (ret)
2007        return -ENODEV;  /* neither found */
2008
2009    /* initialize memcache */
2010    ib_device->memcache = memcache_init(mem_register, mem_deregister);
2011#if 0
2012    /*
2013     * Need this for correctness.  Could use malloc/free hooks instead, but
2014     * they fight with mpich's use, for example.  Consider switching to dreg
2015     * kernel module.
2016     */
2017    mallopt(M_TRIM_THRESHOLD, -1);
2018    mallopt(M_MMAP_MAX, 0);
2019#endif
2020
2021    /*
2022     * Set up tcp socket to listen for connection requests.
2023     * The hostname is currently ignored; the port number is used to bind
2024     * the listening TCP socket which accepts new connections.
2025     */
2026    if (init_flags & BMI_INIT_SERVER) {
2027        ib_tcp_server_init_listen_socket(listen_addr);
2028        ib_device->listen_addr = listen_addr;
2029    } else {
2030        ib_device->listen_sock = -1;
2031        ib_device->listen_addr = NULL;
2032    }
2033
2034    /*
2035     * Initialize data structures.
2036     */
2037    INIT_QLIST_HEAD(&ib_device->connection);
2038    INIT_QLIST_HEAD(&ib_device->sendq);
2039    INIT_QLIST_HEAD(&ib_device->recvq);
2040
2041    ib_device->eager_buf_num  = DEFAULT_EAGER_BUF_NUM;
2042    ib_device->eager_buf_size = DEFAULT_EAGER_BUF_SIZE;
2043    ib_device->eager_buf_payload = ib_device->eager_buf_size - sizeof(msg_header_eager_t);
2044
2045    gen_mutex_unlock(&interface_mutex);
2046
2047    debug(0, "%s: done", __func__);
2048    return ret;
2049}
2050
2051/*
2052 * Shutdown.
2053 */
2054static int BMI_ib_finalize(void)
2055{
2056    gen_mutex_lock(&interface_mutex);
2057
2058    /* if client, send BYE to each connection and bring down the QP */
2059    if (ib_device->listen_sock < 0) {
2060        struct qlist_head *l;
2061        qlist_for_each(l, &ib_device->connection) {
2062            ib_connection_t *c = qlist_upcast(l);
2063            if (c->cancelled)
2064                continue;  /* already closed */
2065            /* Send BYE message to servers, transition QP to drain state */
2066            send_bye(c);
2067            drain_qp(c);
2068        }
2069    }
2070    /* if server, stop listening */
2071    if (ib_device->listen_sock >= 0) {
2072        ib_method_addr_t *ibmap = ib_device->listen_addr->method_data;
2073        close(ib_device->listen_sock);
2074        free(ibmap->hostname);
2075        free(ib_device->listen_addr);
2076    }
2077
2078    /* destroy QPs and other connection structures */
2079    while (ib_device->connection.next != &ib_device->connection) {
2080        ib_connection_t *c = (ib_connection_t *) ib_device->connection.next;
2081        ib_close_connection(c);
2082    }
2083
2084#if MEMCACHE_BOUNCEBUF
2085    if (reg_send_buflist.num > 0) {
2086        memcache_deregister(ib_device->memcache, &reg_send_buflist);
2087        reg_send_buflist.num = 0;
2088        free(reg_send_buflist_buf);
2089    }
2090    if (reg_recv_buflist.num > 0) {
2091        memcache_deregister(ib_device->memcache, &reg_recv_buflist);
2092        reg_recv_buflist.num = 0;
2093        free(reg_recv_buflist_buf);
2094    }
2095#endif
2096
2097    memcache_shutdown(ib_device->memcache);
2098
2099    ib_finalize();
2100
2101    free(ib_device);
2102    ib_device = NULL;
2103
2104    gen_mutex_unlock(&interface_mutex);
2105    return 0;
2106}
2107
2108const struct bmi_method_ops bmi_ib_ops =
2109{
2110    .method_name = "bmi_ib",
2111    .initialize = BMI_ib_initialize,
2112    .finalize = BMI_ib_finalize,
2113    .set_info = BMI_ib_set_info,
2114    .get_info = BMI_ib_get_info,
2115    .memalloc = BMI_ib_memalloc,
2116    .memfree = BMI_ib_memfree,
2117    .unexpected_free = BMI_ib_unexpected_free,
2118    .post_send = BMI_ib_post_send,
2119    .post_sendunexpected = BMI_ib_post_sendunexpected,
2120    .post_recv = BMI_ib_post_recv,
2121    .test = BMI_ib_test,
2122    .testsome = BMI_ib_testsome,
2123    .testcontext = BMI_ib_testcontext,
2124    .testunexpected = BMI_ib_testunexpected,
2125    .method_addr_lookup = BMI_ib_method_addr_lookup,
2126    .post_send_list = BMI_ib_post_send_list,
2127    .post_recv_list = BMI_ib_post_recv_list,
2128    .post_sendunexpected_list = BMI_ib_post_sendunexpected_list,
2129    .open_context = BMI_ib_open_context,
2130    .close_context = BMI_ib_close_context,
2131    .cancel = BMI_ib_cancel,
2132    .rev_lookup_unexpected = BMI_ib_rev_lookup,
2133    .query_addr_range = NULL,
2134};
2135
Note: See TracBrowser for help on using the browser.