root/branches/Orange-Branch/src/io/bmi/bmi_ib/ib.c @ 8832

Revision 8832, 61.8 KB (checked in by bligon, 2 years ago)

Trac Ticket #29. Admin Apps getting a segfault when trying to deallocate BMI structures in an environment
where the server was config'd with two protocols. In addition, the client-core would also hang. To
correct, I added a ref count to the underlying IB interface, so we only deallocate a map when the
reference count is zero.

src/io/bmi/bmi_ib/ib.c

src/io/bmi/bmi_ib/ib.h

Line 
1/*
2 * InfiniBand BMI method.
3 *
4 * Copyright (C) 2003-6 Pete Wyckoff <pw@osc.edu>
5 * Copyright (C) 2006 Kyle Schochenmaier <kschoche@scl.ameslab.gov>
6 *
7 * See COPYING in top-level directory.
8 */
9#include <stdio.h>
10#include <stdlib.h>
11#include <string.h>
12#include <unistd.h>
13#include <errno.h>
14#include <fcntl.h>
15#include <sys/types.h>
16#include <arpa/inet.h>   /* inet_ntoa */
17#include <netdb.h>       /* gethostbyname */
18#include <sys/poll.h>
19#define __PINT_REQPROTO_ENCODE_FUNCS_C  /* include definitions */
20#include <src/common/id-generator/id-generator.h>
21#include <src/io/bmi/bmi-method-support.h>   /* bmi_method_ops ... */
22#include <src/io/bmi/bmi-method-callback.h>  /* bmi_method_addr_reg_callback */
23#include <src/common/gen-locks/gen-locks.h>  /* gen_mutex_t ... */
24#include <src/common/misc/pvfs2-internal.h>
25#include "pint-hint.h"
26
27#ifdef HAVE_VALGRIND_H
28#include <memcheck.h>
29#else
30#define VALGRIND_MAKE_MEM_DEFINED(addr,len)
31#endif
32
33#include "ib.h"
34
35static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
36
37/*
38 * Handle given by upper layer, which must be handed back to create
39 * method_addrs.
40 */
41static int bmi_ib_method_id;
42
43/* alloc space for the single device structure pointer; one for
44 * vapi and one for openib */
45ib_device_t *ib_device __hidden = NULL;
46
47/* these all vector through the ib_device */
48#define new_connection ib_device->func.new_connection
49#define close_connection ib_device->func.close_connection
50#define drain_qp ib_device->func.drain_qp
51#define ib_initialize ib_device->func.ib_initialize
52#define ib_finalize ib_device->func.ib_finalize
53#define post_sr ib_device->func.post_sr
54#define post_sr_rdmaw ib_device->func.post_sr_rdmaw
55#define check_cq ib_device->func.check_cq
56#define prepare_cq_block ib_device->func.prepare_cq_block
57#define ack_cq_completion_event ib_device->func.ack_cq_completion_event
58#define wc_status_string ib_device->func.wc_status_string
59#define mem_register ib_device->func.mem_register
60#define mem_deregister ib_device->func.mem_deregister
61#define check_async_events ib_device->func.check_async_events
62
63#if MEMCACHE_EARLY_REG
64#if MEMCACHE_BOUNCEBUF
65#error Not sensible to use bouncebuf with early reg.  First use of bouncebuf \
66  will register it, thus no effect whether early reg or not.
67#endif
68#endif
69
70#if MEMCACHE_BOUNCEBUF
71static ib_buflist_t reg_send_buflist = { .num = 0 };
72static ib_buflist_t reg_recv_buflist = { .num = 0 };
73static void *reg_send_buflist_buf;
74static void *reg_recv_buflist_buf;
75static const bmi_size_t reg_send_buflist_len = 256 * 1024;
76static const bmi_size_t reg_recv_buflist_len = 256 * 1024;
77#endif
78
79static void encourage_send_incoming_cts(struct buf_head *bh, u_int32_t byte_len);
80static void encourage_recv_incoming(struct buf_head *bh, msg_type_t type,
81                                    u_int32_t byte_len);
82static void encourage_rts_done_waiting_buffer(struct ib_work *sq);
83static int send_cts(struct ib_work *rq);
84static void ib_close_connection(ib_connection_t *c);
85static int ib_tcp_client_connect(ib_method_addr_t *ibmap,
86                                 struct bmi_method_addr *remote_map);
87static int ib_tcp_server_check_new_connections(void);
88static int ib_block_for_activity(int timeout_ms);
89
90/*
91 * Return string form of work completion opcode field.
92 */
93static const char *wc_opcode_string(int opcode)
94{
95    if (opcode == BMI_IB_OP_SEND)
96        return "SEND";
97    else if (opcode == BMI_IB_OP_RECV)
98        return "RECV";
99    else if (opcode == BMI_IB_OP_RDMA_WRITE)
100        return "RDMA WRITE";
101    else
102        return "(UNKNOWN)";
103}
104
105/*
106 * Wander through single completion queue, pulling off messages and
107 * sticking them on the proper connection queues.  Later you can
108 * walk the incomingq looking for things to do to them.  Returns
109 * number of new things that arrived.
110 */
111static int ib_check_cq(void)
112{
113    int ret = 0;
114
115    for (;;) {
116        struct bmi_ib_wc wc;
117        int vret;
118
119        vret = check_cq(&wc);
120        if (vret == 0)
121            break;  /* empty */
122
123        debug(4, "%s: found something", __func__);
124        ++ret;
125        if (wc.status != 0) {
126            /* opcode is not necessarily valid; only wr_id, status, qp_num,
127             * and vendor_err can be relied upon */
128            if (wc.opcode == BMI_IB_OP_SEND) {
129                debug(0, "%s: entry id 0x%llx SEND error %s", __func__,
130                  llu(wc.id), wc_status_string(wc.status));
131                if (wc.id) {
132                    ib_connection_t *c = ptr_from_int64(wc.id);
133                    if (c->cancelled) {
134                        debug(0,
135                          "%s: ignoring send error on cancelled conn to %s",
136                          __func__, c->peername);
137                    }
138                }
139            } else {
140                error("%s: entry id 0x%llx opcode %s error %s", __func__,
141                  llu(wc.id), wc_opcode_string(wc.opcode),
142                  wc_status_string(wc.status));
143            }
144        }
145
146        if (wc.opcode == BMI_IB_OP_RECV) {
147            /*
148             * Remote side did a send to us.
149             */
150            msg_header_common_t mh_common;
151            struct buf_head *bh = ptr_from_int64(wc.id);
152            char *ptr = bh->buf;
153            u_int32_t byte_len = wc.byte_len;
154
155            VALGRIND_MAKE_MEM_DEFINED(ptr, byte_len);
156            decode_msg_header_common_t(&ptr, &mh_common);
157            bh->c->send_credit += mh_common.credit;
158
159            debug(2, "%s: recv from %s len %d type %s credit %d",
160                  __func__, bh->c->peername, byte_len,
161                  msg_type_name(mh_common.type), mh_common.credit);
162            if (mh_common.type == MSG_CTS) {
163                /* incoming CTS messages go to the send engine */
164                encourage_send_incoming_cts(bh, byte_len);
165            } else {
166                /* something for the recv side, no known rq yet */
167                encourage_recv_incoming(bh, mh_common.type, byte_len);
168            }
169
170        } else if (wc.opcode == BMI_IB_OP_RDMA_WRITE) {
171
172            /* completion event for the rdma write we initiated, used
173             * to signal memory unpin etc. */
174            struct ib_work *sq = ptr_from_int64(wc.id);
175
176            debug(3, "%s: sq %p %s", __func__, sq,
177                  sq_state_name(sq->state.send));
178
179            bmi_ib_assert(sq->state.send == SQ_WAITING_DATA_SEND_COMPLETION,
180                          "%s: wrong send state %s", __func__,
181                          sq_state_name(sq->state.send));
182
183            sq->state.send = SQ_WAITING_RTS_DONE_BUFFER;
184
185#if !MEMCACHE_BOUNCEBUF
186            memcache_deregister(ib_device->memcache, &sq->buflist);
187#endif
188            debug(2, "%s: sq %p RDMA write done, now %s", __func__, sq,
189                  sq_state_name(sq->state.send));
190
191            encourage_rts_done_waiting_buffer(sq);
192
193        } else if (wc.opcode == BMI_IB_OP_SEND) {
194
195            struct buf_head *bh = ptr_from_int64(wc.id);
196            struct ib_work *sq = bh->sq;
197
198            if (sq == NULL) {
199                /* MSG_BYE or MSG_CREDIT */
200                debug(2, "%s: MSG_BYE or MSG_CREDIT completed locally",
201                      __func__);
202            } else if (sq->type == BMI_SEND) {
203                sq_state_t state = sq->state.send;
204
205                if (state == SQ_WAITING_EAGER_SEND_COMPLETION)
206                    sq->state.send = SQ_WAITING_USER_TEST;
207                else if (state == SQ_WAITING_RTS_SEND_COMPLETION)
208                    sq->state.send = SQ_WAITING_CTS;
209                else if (state == SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS)
210                    sq->state.send = SQ_WAITING_DATA_SEND_COMPLETION;
211                else if (state == SQ_WAITING_RTS_DONE_SEND_COMPLETION)
212                    sq->state.send = SQ_WAITING_USER_TEST;
213                else if (state == SQ_CANCELLED)
214                    ;
215                else
216                    bmi_ib_assert(0, "%s: unknown send state %s (%d) of sq %p",
217                                  __func__, sq_state_name(sq->state.send),
218                                  sq->state.send, sq);
219                debug(2, "%s: send to %s completed locally: sq %p -> %s",
220                      __func__, bh->c->peername, sq,
221                      sq_state_name(sq->state.send));
222
223            } else {
224                struct ib_work *rq = sq;  /* rename */
225                rq_state_t state = rq->state.recv;
226               
227                if (state == RQ_RTS_WAITING_CTS_SEND_COMPLETION)
228                    rq->state.recv = RQ_RTS_WAITING_RTS_DONE;
229                else if (state == RQ_CANCELLED)
230                    ;
231                else
232                    bmi_ib_assert(0, "%s: unknown recv state %s of rq %p",
233                                  __func__, rq_state_name(rq->state.recv), rq);
234                debug(2, "%s: send to %s completed locally: rq %p -> %s",
235                      __func__, bh->c->peername, rq,
236                      rq_state_name(rq->state.recv));
237            }
238
239            qlist_add_tail(&bh->list, &bh->c->eager_send_buf_free);
240
241        } else {
242            error("%s: cq entry id 0x%llx opcode %d unexpected", __func__,
243              llu(wc.id), wc.opcode);
244        }
245    }
246    return ret;
247}
248
249/*
250 * Initialize common header of all messages.
251 */
252static void msg_header_init(msg_header_common_t *mh_common,
253                            ib_connection_t *c, msg_type_t type)
254{
255    mh_common->type = type;
256    mh_common->credit = c->return_credit;
257    c->return_credit = 0;
258}
259
260/*
261 * Grab an empty buf head, if available.
262 */
263static struct buf_head *get_eager_buf(ib_connection_t *c)
264{
265    struct buf_head *bh = NULL;
266
267    if (c->send_credit > 0) {
268        --c->send_credit;
269        bh = qlist_try_del_head(&c->eager_send_buf_free);
270        bmi_ib_assert(bh, "%s: empty eager_send_buf_free list, peer %s",
271                      __func__, c->peername);
272    }
273    return bh;
274}
275
276/*
277 * Re-post a receive buffer, possibly returning credit to the peer.
278 */
279static void post_rr(ib_connection_t *c, struct buf_head *bh)
280{
281    ib_device->func.post_rr(c, bh);
282    ++c->return_credit;
283
284    /* if credits are building up, explicitly send them over */
285    if (c->return_credit > ib_device->eager_buf_num - 4) {
286        msg_header_common_t mh_common;
287        char *ptr;
288
289        /* one credit saved back for just this situation, do not check */
290        --c->send_credit;
291        bh = qlist_try_del_head(&c->eager_send_buf_free);
292        bmi_ib_assert(bh, "%s: empty eager_send_buf_free list", __func__);
293        bh->sq = NULL;
294        debug(2, "%s: return %d credits to %s", __func__, c->return_credit,
295              c->peername);
296        msg_header_init(&mh_common, c, MSG_CREDIT);
297        ptr = bh->buf;
298        encode_msg_header_common_t(&ptr, &mh_common);
299        post_sr(bh, sizeof(mh_common));
300    }
301}
302
303/*
304 * Push a send message along its next step.  Called internally only.
305 */
306static void encourage_send_waiting_buffer(struct ib_work *sq)
307{
308    struct buf_head *bh;
309    ib_connection_t *c = sq->c;
310
311    debug(3, "%s: sq %p", __func__, sq);
312    bmi_ib_assert(sq->state.send == SQ_WAITING_BUFFER,
313                  "%s: wrong send state %s", __func__,
314                  sq_state_name(sq->state.send));
315
316    bh = get_eager_buf(c);
317    if (!bh) {
318        debug(2, "%s: sq %p no free send buffers to %s", __func__,
319              sq, c->peername);
320        return;
321    }
322    sq->bh = bh;
323    bh->sq = sq;  /* uplink for completion */
324
325    if (sq->buflist.tot_len <= ib_device->eager_buf_payload) {
326        /*
327         * Eager send.
328         */
329        msg_header_eager_t mh_eager;
330        char *ptr = bh->buf;
331
332        msg_header_init(&mh_eager.c, c, sq->is_unexpected
333                        ? MSG_EAGER_SENDUNEXPECTED : MSG_EAGER_SEND);
334        mh_eager.bmi_tag = sq->bmi_tag;
335        encode_msg_header_eager_t(&ptr, &mh_eager);
336
337        memcpy_from_buflist(&sq->buflist,
338                            (msg_header_eager_t *) bh->buf + 1);
339
340        /* send the message */
341        post_sr(bh, (u_int32_t) (sizeof(mh_eager) + sq->buflist.tot_len));
342
343        /* wait for ack saying remote has received and recycled his buf */
344        sq->state.send = SQ_WAITING_EAGER_SEND_COMPLETION;
345        debug(2, "%s: sq %p sent EAGER len %lld", __func__, sq,
346              lld(sq->buflist.tot_len));
347
348    } else {
349        /*
350         * Request to send, rendez-vous.  Include the mop id in the message
351         * which will be returned to us in the CTS so we can look it up.
352         */
353        msg_header_rts_t mh_rts;
354        char *ptr = bh->buf;
355
356        msg_header_init(&mh_rts.c, c, MSG_RTS);
357        mh_rts.bmi_tag = sq->bmi_tag;
358        mh_rts.mop_id = sq->mop->op_id;
359        mh_rts.tot_len = sq->buflist.tot_len;
360
361        encode_msg_header_rts_t(&ptr, &mh_rts);
362
363        post_sr(bh, sizeof(mh_rts));
364
365#if MEMCACHE_EARLY_REG
366        /* XXX: need to lock against receiver thread?  Could poll return
367         * the CTS and start the data send before this completes? */
368        memcache_register(ib_device->memcache, &sq->buflist);
369#endif
370
371        sq->state.send = SQ_WAITING_RTS_SEND_COMPLETION;
372        debug(2, "%s: sq %p sent RTS mopid %llx len %lld", __func__, sq,
373              llu(sq->mop->op_id), lld(sq->buflist.tot_len));
374    }
375}
376
377/*
378 * Look at the incoming message which is a response to an earlier RTS
379 * from us, and start the real data send.
380 */
381static void
382encourage_send_incoming_cts(struct buf_head *bh, u_int32_t byte_len)
383{
384    msg_header_cts_t mh_cts;
385    struct ib_work *sq, *sqt;
386    u_int32_t want;
387    char *ptr = bh->buf;
388
389    decode_msg_header_cts_t(&ptr, &mh_cts);
390
391    /*
392     * Look through this CTS message to determine the owning sq.  Works
393     * using the mop_id which was sent during the RTS, now returned to us.
394     */
395    sq = 0;
396    qlist_for_each_entry(sqt, &ib_device->sendq, list) {
397        debug(8, "%s: looking for op_id 0x%llx, consider 0x%llx", __func__,
398          llu(mh_cts.rts_mop_id), llu(sqt->mop->op_id));
399        if (sqt->c == bh->c
400            && sqt->mop->op_id == (bmi_op_id_t) mh_cts.rts_mop_id) {
401            sq = sqt;
402            break;
403        }
404    }
405    if (!sq)
406        error("%s: mop_id %llx in CTS message not found", __func__,
407          llu(mh_cts.rts_mop_id));
408
409    debug(2, "%s: sq %p %s mopid %llx len %u", __func__, sq,
410          sq_state_name(sq->state.send), llu(mh_cts.rts_mop_id), byte_len);
411    bmi_ib_assert(sq->state.send == SQ_WAITING_CTS ||
412                  sq->state.send == SQ_WAITING_RTS_SEND_COMPLETION,
413                  "%s: wrong send state %s", __func__,
414                  sq_state_name(sq->state.send));
415
416    /* message; cts content; list of buffers, lengths, and keys */
417    want = sizeof(mh_cts)
418         + mh_cts.buflist_num * MSG_HEADER_CTS_BUFLIST_ENTRY_SIZE;
419    if (bmi_ib_unlikely(byte_len != want))
420        error("%s: wrong message size for CTS, got %u, want %u", __func__,
421              byte_len, want);
422
423    /* start the big tranfser */
424    post_sr_rdmaw(sq, &mh_cts, (msg_header_cts_t *) bh->buf + 1);
425
426    /* re-post our recv buf now that we have all the information from CTS */
427    post_rr(sq->c, bh);
428
429    if (sq->state.send == SQ_WAITING_CTS)
430        sq->state.send = SQ_WAITING_DATA_SEND_COMPLETION;
431    else
432        sq->state.send = SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS;
433    debug(3, "%s: sq %p now %s", __func__, sq, sq_state_name(sq->state.send));
434}
435
436
437/*
438 * See if anything was preposted that matches this.
439 */
440static struct ib_work *
441find_matching_recv(rq_state_t statemask, const ib_connection_t *c,
442  bmi_msg_tag_t bmi_tag)
443{
444    struct ib_work *rq;
445
446    qlist_for_each_entry(rq, &ib_device->recvq, list) {
447        if ((rq->state.recv & statemask) && rq->c == c
448            && rq->bmi_tag == bmi_tag)
449            return rq;
450    }
451    return NULL;
452}
453
454/*
455 * Init a new recvq entry from something that arrived on the wire.
456 */
457static struct ib_work *
458alloc_new_recv(ib_connection_t *c, struct buf_head *bh)
459{
460    struct ib_work *rq = bmi_ib_malloc(sizeof(*rq));
461    rq->type = BMI_RECV;
462    rq->c = c;
463    ++rq->c->refcnt;
464    rq->bh = bh;
465    rq->mop = 0;  /* until user posts for it */
466    rq->rts_mop_id = 0;
467    qlist_add_tail(&rq->list, &ib_device->recvq);
468    return rq;
469}
470
471/*
472 * Called from incoming message processing, except for the case
473 * of ack to a CTS, for which we know the rq (see below).
474 *
475 * Unexpected receive, either no post or explicit sendunexpected.
476 */
477static void
478encourage_recv_incoming(struct buf_head *bh, msg_type_t type, u_int32_t byte_len)
479{
480    ib_connection_t *c = bh->c;
481    struct ib_work *rq;
482    char *ptr = bh->buf;
483
484    debug(4, "%s: incoming msg type %s", __func__, msg_type_name(type));
485
486    if (type == MSG_EAGER_SEND) {
487
488        msg_header_eager_t mh_eager;
489
490        ptr = bh->buf;
491        decode_msg_header_eager_t(&ptr, &mh_eager);
492
493        debug(2, "%s: recv eager len %u", __func__, byte_len);
494
495        rq = find_matching_recv(RQ_WAITING_INCOMING, c, mh_eager.bmi_tag);
496        if (rq) {
497            bmi_size_t len = byte_len - sizeof(mh_eager);
498            if (len > rq->buflist.tot_len)
499                error("%s: EAGER received %lld too small for buffer %lld",
500                  __func__, lld(len), lld(rq->buflist.tot_len));
501
502            memcpy_to_buflist(&rq->buflist,
503                              (msg_header_eager_t *) bh->buf + 1,
504                              len);
505
506            /* re-post */
507            post_rr(c, bh);
508
509            rq->state.recv = RQ_EAGER_WAITING_USER_TEST;
510            debug(2, "%s: matched rq %p now %s", __func__, rq,
511              rq_state_name(rq->state.recv));
512#if MEMCACHE_EARLY_REG
513            /* if a big receive was posted but only a small message came
514             * through, unregister it now */
515            if (rq->buflist.tot_len > ib_device->eager_buf_payload) {
516                debug(2, "%s: early registration not needed, dereg after eager",
517                  __func__);
518                memcache_deregister(ib_device->memcache, &rq->buflist);
519            }
520#endif
521
522        } else {
523            rq = alloc_new_recv(c, bh);
524            /* return value for when user does post_recv for this one */
525            rq->bmi_tag = mh_eager.bmi_tag;
526            rq->state.recv = RQ_EAGER_WAITING_USER_POST;
527            /* do not repost or ack, keeping bh until user test */
528            debug(2, "%s: new rq %p now %s", __func__, rq,
529              rq_state_name(rq->state.recv));
530        }
531        rq->actual_len = byte_len - sizeof(mh_eager);
532
533    } else if (type == MSG_EAGER_SENDUNEXPECTED) {
534
535        msg_header_eager_t mh_eager;
536
537        ptr = bh->buf;
538        decode_msg_header_eager_t(&ptr, &mh_eager);
539
540        debug(2, "%s: recv eager unexpected len %u", __func__, byte_len);
541
542        rq = alloc_new_recv(c, bh);
543        /* return values for when user does testunexpected for this one */
544        rq->bmi_tag = mh_eager.bmi_tag;
545        rq->state.recv = RQ_EAGER_WAITING_USER_TESTUNEXPECTED;
546        rq->actual_len = byte_len - sizeof(mh_eager);
547        /* do not repost, keeping bh until user test */
548        debug(2, "%s: new rq %p now %s", __func__, rq,
549          rq_state_name(rq->state.recv));
550
551    } else if (type == MSG_RTS) {
552        /*
553         * Sender wants to send a big message, initiates rts/cts protocol.
554         * Has the user posted a matching receive for it yet?
555         */
556        msg_header_rts_t mh_rts;
557
558        ptr = bh->buf;
559        decode_msg_header_rts_t(&ptr, &mh_rts);
560
561        debug(2, "%s: recv RTS len %lld mopid %llx", __func__,
562              lld(mh_rts.tot_len), llu(mh_rts.mop_id));
563
564        rq = find_matching_recv(RQ_WAITING_INCOMING, c, mh_rts.bmi_tag);
565        if (rq) {
566            if ((int)mh_rts.tot_len > rq->buflist.tot_len) {
567                error("%s: RTS received %llu too small for buffer %llu",
568                  __func__, llu(mh_rts.tot_len), llu(rq->buflist.tot_len));
569            }
570            rq->state.recv = RQ_RTS_WAITING_CTS_BUFFER;
571            debug(2, "%s: matched rq %p MSG_RTS now %s", __func__, rq,
572              rq_state_name(rq->state.recv));
573        } else {
574            rq = alloc_new_recv(c, bh);
575            /* return value for when user does post_recv for this one */
576            rq->bmi_tag = mh_rts.bmi_tag;
577            rq->state.recv = RQ_RTS_WAITING_USER_POST;
578            debug(2, "%s: new rq %p MSG_RTS now %s", __func__, rq,
579              rq_state_name(rq->state.recv));
580        }
581        rq->actual_len = mh_rts.tot_len;
582        rq->rts_mop_id = mh_rts.mop_id;
583
584        post_rr(c, bh);
585
586        if (rq->state.recv == RQ_RTS_WAITING_CTS_BUFFER) {
587            int ret;
588            ret = send_cts(rq);
589            if (ret == 0)
590                rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
591            /* else keep waiting until we can send that cts */
592        }
593
594    } else if (type == MSG_RTS_DONE) {
595
596        msg_header_rts_done_t mh_rts_done;
597        struct ib_work *rqt;
598
599        ptr = bh->buf;
600        decode_msg_header_rts_done_t(&ptr, &mh_rts_done);
601
602        debug(2, "%s: recv RTS_DONE mop_id %llx", __func__,
603              llu(mh_rts_done.mop_id));
604
605        rq = NULL;
606        qlist_for_each_entry(rqt, &ib_device->recvq, list) {
607            if (rqt->c == c && rqt->rts_mop_id == mh_rts_done.mop_id &&
608                rqt->state.recv == RQ_RTS_WAITING_RTS_DONE) {
609                rq = rqt;
610                break;
611            }
612        }
613
614        bmi_ib_assert(rq, "%s: mop_id %llx in RTS_DONE message not found",
615                      __func__, llu(mh_rts_done.mop_id));
616
617#if MEMCACHE_BOUNCEBUF
618        memcpy_to_buflist(&rq->buflist, reg_recv_buflist_buf,
619                          rq->buflist.tot_len);
620#else
621        memcache_deregister(ib_device->memcache, &rq->buflist);
622#endif
623
624        post_rr(c, bh);
625
626        rq->state.recv = RQ_RTS_WAITING_USER_TEST;
627
628    } else if (type == MSG_BYE) {
629        /*
630         * Other side requests connection close.  Do it.
631         */
632        debug(2, "%s: recv BYE", __func__);
633        post_rr(c, bh);
634        ib_close_connection(c);
635
636    } else if (type == MSG_CREDIT) {
637
638        /* already added the credit in check_cq */
639        debug(2, "%s: recv CREDIT", __func__);
640        post_rr(c, bh);
641
642    } else {
643        error("%s: unknown message header type %d len %u", __func__,
644              type, byte_len);
645    }
646}
647
648/*
649 * We finished the RDMA write.  Send him a done message.
650 */
651static void encourage_rts_done_waiting_buffer(struct ib_work *sq)
652{
653    ib_connection_t *c = sq->c;
654    struct buf_head *bh;
655    char *ptr;
656    msg_header_rts_done_t mh_rts_done;
657
658    bh = get_eager_buf(c);
659    if (!bh) {
660        debug(2, "%s: sq %p no free send buffers to %s",
661              __func__, sq, c->peername);
662        return;
663    }
664    sq->bh = bh;
665    bh->sq = sq;
666    ptr = bh->buf;
667
668    msg_header_init(&mh_rts_done.c, c, MSG_RTS_DONE);
669    mh_rts_done.mop_id = sq->mop->op_id;
670
671    debug(2, "%s: sq %p sent RTS_DONE mopid %llx", __func__,
672          sq, llu(sq->mop->op_id));
673
674    encode_msg_header_rts_done_t(&ptr, &mh_rts_done);
675
676    post_sr(bh, sizeof(mh_rts_done));
677    sq->state.send = SQ_WAITING_RTS_DONE_SEND_COMPLETION;
678}
679
680static void send_bye(ib_connection_t *c)
681{
682    msg_header_common_t mh_common;
683    struct buf_head *bh;
684    char *ptr;
685
686    debug(2, "%s: sending bye", __func__);
687    bh = get_eager_buf(c);
688    if (!bh) {
689        debug(2, "%s: no free send buffers to %s", __func__, c->peername);
690        /* if no messages available, let garbage collection on server deal */
691        return;
692    }
693    bh->sq = NULL;
694    ptr = bh->buf;
695    msg_header_init(&mh_common, c, MSG_BYE);
696    encode_msg_header_common_t(&ptr, &mh_common);
697
698    post_sr(bh, sizeof(mh_common));
699}
700
701/*
702 * Two places need to send a CTS in response to an RTS.  They both
703 * call this.  This handles pinning the memory, too.  Don't forget
704 * to unpin when done.
705 */
706static int
707send_cts(struct ib_work *rq)
708{
709    ib_connection_t *c = rq->c;
710    struct buf_head *bh;
711    msg_header_cts_t mh_cts;
712    u_int64_t *bufp;
713    u_int32_t *lenp;
714    u_int32_t *keyp;
715    u_int32_t post_len;
716    char *ptr;
717    int i;
718
719    debug(2, "%s: rq %p from %s mopid %llx len %lld", __func__,
720          rq, rq->c->peername, llu(rq->rts_mop_id),
721          lld(rq->buflist.tot_len));
722
723    bh = get_eager_buf(c);
724    if (!bh) {
725        debug(2, "%s: rq %p no free send buffers to %s",
726              __func__, rq, c->peername);
727        return 1;
728    }
729    rq->bh = bh;
730    bh->sq = (struct ib_work *) rq;  /* uplink for completion */
731
732#if MEMCACHE_BOUNCEBUF
733    if (reg_recv_buflist.num == 0) {
734        reg_recv_buflist.num = 1;
735        reg_recv_buflist.buf.recv = &reg_recv_buflist_buf;
736        reg_recv_buflist.len = &reg_recv_buflist_len;
737        reg_recv_buflist.tot_len = reg_recv_buflist_len;
738        reg_recv_buflist_buf = bmi_ib_malloc(reg_recv_buflist_len);
739        memcache_register(ib_device->memcache, &reg_recv_buflist);
740    }
741    if (rq->buflist.tot_len > reg_recv_buflist_len)
742        error("%s: recv prereg buflist too small, need %lld", __func__,
743          lld(rq->buflist.tot_len));
744
745    ib_buflist_t save_buflist = rq->buflist;
746    rq->buflist = reg_recv_buflist;
747#else
748#  if !MEMCACHE_EARLY_REG
749    memcache_register(ib_device->memcache, &rq->buflist);
750#  endif
751#endif
752
753    msg_header_init(&mh_cts.c, c, MSG_CTS);
754    mh_cts.rts_mop_id = rq->rts_mop_id;
755    mh_cts.buflist_tot_len = rq->buflist.tot_len;
756    mh_cts.buflist_num = rq->buflist.num;
757
758    ptr = bh->buf;
759    encode_msg_header_cts_t(&ptr, &mh_cts);
760
761    /* encode all the buflist entries */
762    bufp = (u_int64_t *)((msg_header_cts_t *) bh->buf + 1);
763    lenp = (u_int32_t *)(bufp + rq->buflist.num);
764    keyp = (u_int32_t *)(lenp + rq->buflist.num);
765    post_len = (char *)(keyp + rq->buflist.num) - (char *)bh->buf;
766    if (post_len > ib_device->eager_buf_size)
767        error("%s: too many (%d) recv buflist entries for buf", __func__,
768          rq->buflist.num);
769    for (i=0; i<rq->buflist.num; i++) {
770        bufp[i] = htobmi64(int64_from_ptr(rq->buflist.buf.recv[i]));
771        lenp[i] = htobmi32(rq->buflist.len[i]);
772        keyp[i] = htobmi32(rq->buflist.memcache[i]->memkeys.rkey);
773    }
774
775    /* send the cts */
776    post_sr(bh, post_len);
777
778#if MEMCACHE_BOUNCEBUF
779    rq->buflist = save_buflist;
780#endif
781
782    return 0;
783}
784
785/*
786 * Bring up the connection before posting a send or receive on it.
787 */
788static int
789ensure_connected(struct bmi_method_addr *remote_map)
790{
791    int ret = 0;
792    ib_method_addr_t *ibmap = remote_map->method_data;
793
794    if (!ibmap->c && ibmap->reconnect_flag)
795        ret = ib_tcp_client_connect(ibmap, remote_map);
796    else if(!ibmap->c && !ibmap->reconnect_flag)
797        ret = 1; /* cannot actively connect */
798    else
799        ret = 0;
800
801    return ret;
802}
803
804/*
805 * Generic interface for both send and sendunexpected, list and non-list send.
806 */
807static int
808post_send(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
809          int numbufs, const void *const *buffers, const bmi_size_t *sizes,
810          bmi_size_t total_size, bmi_msg_tag_t tag, void *user_ptr,
811          bmi_context_id context_id, int is_unexpected)
812{
813    struct ib_work *sq;
814    struct method_op *mop;
815    ib_method_addr_t *ibmap;
816    int i;
817    int ret = 0;
818
819    gen_mutex_lock(&interface_mutex);
820    ret = ensure_connected(remote_map);
821    if (ret)
822        goto out;
823    ibmap = remote_map->method_data;
824
825    /* alloc and build new sendq structure */
826    sq = bmi_ib_malloc(sizeof(*sq));
827    sq->type = BMI_SEND;
828    sq->state.send = SQ_WAITING_BUFFER;
829
830    debug(2, "%s: sq %p len %lld peer %s", __func__, sq, (long long) total_size,
831          ibmap->c->peername);
832
833    /*
834     * For a single buffer, store it inside the sq directly, else save
835     * the pointer to the list the user built when calling a _list
836     * function.  This case is indicated by the non-_list functions by
837     * a zero in numbufs.
838     */
839    if (numbufs == 0) {
840        sq->buflist_one_buf.send = *buffers;
841        sq->buflist_one_len = *sizes;
842        sq->buflist.num = 1;
843        sq->buflist.buf.send = &sq->buflist_one_buf.send;
844        sq->buflist.len = &sq->buflist_one_len;
845    } else {
846        sq->buflist.num = numbufs;
847        sq->buflist.buf.send = buffers;
848        sq->buflist.len = sizes;
849    }
850    sq->buflist.tot_len = 0;
851    for (i=0; i<sq->buflist.num; i++)
852        sq->buflist.tot_len += sizes[i];
853
854    /*
855     * This passed-in total length field does not make much sense
856     * to me, but I'll at least check it for accuracy.
857     */
858    if (sq->buflist.tot_len != total_size)
859        error("%s: user-provided tot len %lld"
860          " does not match buffer list tot len %lld",
861          __func__, lld(total_size), lld(sq->buflist.tot_len));
862
863    /* unexpected messages must fit inside an eager message */
864    if (is_unexpected && sq->buflist.tot_len > ib_device->eager_buf_payload) {
865        free(sq);
866        ret = -EINVAL;
867        goto out;
868    }
869
870    sq->bmi_tag = tag;
871    sq->c = ibmap->c;
872    ++sq->c->refcnt;
873    sq->is_unexpected = is_unexpected;
874    qlist_add_tail(&sq->list, &ib_device->sendq);
875
876    /* generate identifier used by caller to test for message later */
877    mop = bmi_ib_malloc(sizeof(*mop));
878    id_gen_fast_register(&mop->op_id, mop);
879    mop->addr = remote_map;  /* set of function pointers, essentially */
880    mop->method_data = sq;
881    mop->user_ptr = user_ptr;
882    mop->context_id = context_id;
883    *id = mop->op_id;
884    sq->mop = mop;
885    debug(3, "%s: new sq %p", __func__, sq);
886
887    /* and start sending it if possible */
888    encourage_send_waiting_buffer(sq);
889  out:
890    gen_mutex_unlock(&interface_mutex);
891    return ret;
892}
893
894static int
895BMI_ib_post_send(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
896                 const void *buffer, bmi_size_t total_size,
897                 enum bmi_buffer_type buffer_flag __unused,
898                 bmi_msg_tag_t tag, void *user_ptr, bmi_context_id
899                 context_id, PVFS_hint hints __unused)
900{
901    return post_send(id, remote_map, 0, &buffer, &total_size,
902                     total_size, tag, user_ptr, context_id, 0);
903}
904
905static int
906BMI_ib_post_send_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
907  const void *const *buffers, const bmi_size_t *sizes, int list_count,
908  bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
909  bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id, PVFS_hint
910  hints __unused)
911{
912    return post_send(id, remote_map, list_count, buffers, sizes,
913                     total_size, tag, user_ptr, context_id, 0);
914}
915
916static int
917BMI_ib_post_sendunexpected(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
918                           const void *buffer, bmi_size_t total_size,
919                           enum bmi_buffer_type buffer_flag __unused,
920                           bmi_msg_tag_t tag, void *user_ptr,
921                           bmi_context_id context_id, PVFS_hint hints
922                           __unused)
923{
924    return post_send(id, remote_map, 0, &buffer, &total_size,
925                     total_size, tag, user_ptr, context_id, 1);
926}
927
928static int
929BMI_ib_post_sendunexpected_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
930                                const void *const *buffers,
931                                const bmi_size_t *sizes, int list_count,
932                                bmi_size_t total_size,
933                                enum bmi_buffer_type buffer_flag __unused,
934                                bmi_msg_tag_t tag, void *user_ptr,
935                                bmi_context_id context_id, PVFS_hint hints
936                                __unused)
937{
938    return post_send(id, remote_map, list_count, buffers, sizes,
939                     total_size, tag, user_ptr, context_id, 1);
940}
941
942/*
943 * Used by both recv and recv_list.
944 */
945static int
946post_recv(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
947          int numbufs, void *const *buffers, const bmi_size_t *sizes,
948          bmi_size_t tot_expected_len, bmi_msg_tag_t tag,
949          void *user_ptr, bmi_context_id context_id)
950{
951    struct ib_work *rq;
952    struct method_op *mop;
953    ib_method_addr_t *ibmap;
954    ib_connection_t *c;
955    int i;
956    int ret = 0;
957   
958    gen_mutex_lock(&interface_mutex);
959    ret = ensure_connected(remote_map);
960    if (ret)
961        goto out;
962    ibmap = remote_map->method_data;
963    c = ibmap->c;
964
965    /* poll interface first to save a few steps below */
966    ib_check_cq();
967
968    /* check to see if matching recv is in the queue */
969    rq = find_matching_recv(
970      RQ_EAGER_WAITING_USER_POST | RQ_RTS_WAITING_USER_POST, c, tag);
971    if (rq) {
972        debug(2, "%s: rq %p matches %s", __func__, rq,
973          rq_state_name(rq->state.recv));
974    } else {
975        /* alloc and build new recvq structure */
976        rq = alloc_new_recv(c, NULL);
977        rq->state.recv = RQ_WAITING_INCOMING;
978        rq->bmi_tag = tag;
979        debug(2, "%s: new rq %p", __func__, rq);
980    }
981
982    if (numbufs == 0) {
983        rq->buflist_one_buf.recv = *buffers;
984        rq->buflist_one_len = *sizes;
985        rq->buflist.num = 1;
986        rq->buflist.buf.recv = &rq->buflist_one_buf.recv;
987        rq->buflist.len = &rq->buflist_one_len;
988    } else {
989        rq->buflist.num = numbufs;
990        rq->buflist.buf.recv = buffers;
991        rq->buflist.len = sizes;
992    }
993    rq->buflist.tot_len = 0;
994    for (i=0; i<rq->buflist.num; i++)
995        rq->buflist.tot_len += sizes[i];
996
997    /*
998     * This passed-in total length field does not make much sense
999     * to me, but I'll at least check it for accuracy.
1000     */
1001    if (rq->buflist.tot_len != tot_expected_len)
1002        error("%s: user-provided tot len %lld"
1003          " does not match buffer list tot len %lld",
1004          __func__, lld(tot_expected_len), lld(rq->buflist.tot_len));
1005
1006    /* generate identifier used by caller to test for message later */
1007    mop = bmi_ib_malloc(sizeof(*mop));
1008    id_gen_fast_register(&mop->op_id, mop);
1009    mop->addr = remote_map;  /* set of function pointers, essentially */
1010    mop->method_data = rq;
1011    mop->user_ptr = user_ptr;
1012    mop->context_id = context_id;
1013    *id = mop->op_id;
1014    rq->mop = mop;
1015
1016    /* handle the two "waiting for a local user post" states */
1017    if (rq->state.recv == RQ_EAGER_WAITING_USER_POST) {
1018
1019        msg_header_eager_t mh_eager;
1020        char *ptr = rq->bh->buf;
1021
1022        decode_msg_header_eager_t(&ptr, &mh_eager);
1023
1024        debug(2, "%s: rq %p state %s finish eager directly", __func__,
1025          rq, rq_state_name(rq->state.recv));
1026        if (rq->actual_len > tot_expected_len) {
1027            error("%s: received %lld matches too-small buffer %lld",
1028              __func__, lld(rq->actual_len), lld(rq->buflist.tot_len));
1029        }
1030
1031        memcpy_to_buflist(&rq->buflist,
1032                          (msg_header_eager_t *) rq->bh->buf + 1,
1033                          rq->actual_len);
1034
1035        /* re-post */
1036        post_rr(rq->c, rq->bh);
1037
1038        /* now just wait for user to test, never do "immediate completion" */
1039        rq->state.recv = RQ_EAGER_WAITING_USER_TEST;
1040        goto out;
1041
1042    } else if (rq->state.recv == RQ_RTS_WAITING_USER_POST) {
1043        int sret;
1044        debug(2, "%s: rq %p %s send cts", __func__, rq,
1045          rq_state_name(rq->state.recv));
1046        /* try to send, or wait for send buffer space */
1047        rq->state.recv = RQ_RTS_WAITING_CTS_BUFFER;
1048#if MEMCACHE_EARLY_REG
1049        memcache_register(ib_device->memcache, &rq->buflist);
1050#endif
1051        sret = send_cts(rq);
1052        if (sret == 0)
1053            rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
1054        goto out;
1055    }
1056
1057#if MEMCACHE_EARLY_REG
1058    /* but remember that this might not be used if the other side sends
1059     * less than we posted for receive; that's legal */
1060    if (rq->buflist.tot_len > ib_device->eager_buf_payload)
1061        memcache_register(ib_device->memcache, &rq->buflist);
1062#endif
1063
1064  out:
1065    gen_mutex_unlock(&interface_mutex);
1066    return ret;
1067}
1068
1069static int
1070BMI_ib_post_recv(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1071  void *buffer, bmi_size_t expected_len, bmi_size_t *actual_len __unused,
1072  enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
1073  bmi_context_id context_id, PVFS_hint hints __unused)
1074{
1075    return post_recv(id, remote_map, 0, &buffer, &expected_len,
1076                     expected_len, tag, user_ptr, context_id);
1077}
1078
1079static int
1080BMI_ib_post_recv_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1081  void *const *buffers, const bmi_size_t *sizes, int list_count,
1082  bmi_size_t tot_expected_len, bmi_size_t *tot_actual_len __unused,
1083  enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
1084  bmi_context_id context_id, PVFS_hint hints __unused)
1085{
1086    return post_recv(id, remote_map, list_count, buffers, sizes,
1087                     tot_expected_len, tag, user_ptr, context_id);
1088}
1089
1090/*
1091 * Internal shared helper function.  Return 1 if found something
1092 * completed.
1093 */
1094static int
1095test_sq(struct ib_work *sq, bmi_op_id_t *outid, bmi_error_code_t *err,
1096  bmi_size_t *size, void **user_ptr, int complete)
1097{
1098    ib_connection_t *c;
1099
1100    debug(9, "%s: sq %p outid %p err %p size %p user_ptr %p complete %d",
1101      __func__, sq, outid, err, size, user_ptr, complete);
1102
1103    if (sq->state.send == SQ_WAITING_USER_TEST) {
1104        if (complete) {
1105            debug(2, "%s: sq %p completed %lld to %s", __func__,
1106              sq, lld(sq->buflist.tot_len), sq->c->peername);
1107            *outid = sq->mop->op_id;
1108            *err = 0;
1109            *size = sq->buflist.tot_len;
1110            if (user_ptr)
1111                *user_ptr = sq->mop->user_ptr;
1112            qlist_del(&sq->list);
1113            id_gen_fast_unregister(sq->mop->op_id);
1114            c = sq->c;
1115            free(sq->mop);
1116            free(sq);
1117            --c->refcnt;
1118            if (c->closed || c->cancelled)
1119                ib_close_connection(c);
1120            return 1;
1121        }
1122    /* this state needs help, push it (ideally would be triggered
1123     * when the resource is freed... XXX */
1124    } else if (sq->state.send == SQ_WAITING_BUFFER) {
1125        debug(2, "%s: sq %p %s, encouraging", __func__, sq,
1126          sq_state_name(sq->state.send));
1127        encourage_send_waiting_buffer(sq);
1128    } else if (sq->state.send == SQ_WAITING_RTS_DONE_BUFFER) {
1129        debug(2, "%s: sq %p %s, encouraging", __func__, sq,
1130          sq_state_name(sq->state.send));
1131        encourage_rts_done_waiting_buffer(sq);
1132    } else if (sq->state.send == SQ_CANCELLED && complete) {
1133        debug(2, "%s: sq %p cancelled", __func__, sq);
1134        *outid = sq->mop->op_id;
1135        *err = -PVFS_ETIMEDOUT;
1136        if (user_ptr)
1137            *user_ptr = sq->mop->user_ptr;
1138        qlist_del(&sq->list);
1139        id_gen_fast_unregister(sq->mop->op_id);
1140        c = sq->c;
1141        free(sq->mop);
1142        free(sq);
1143        --c->refcnt;
1144        if (c->closed || c->cancelled)
1145            ib_close_connection(c);
1146        return 1;
1147    } else {
1148        debug(9, "%s: sq %p found, not done, state %s", __func__,
1149          sq, sq_state_name(sq->state.send));
1150    }
1151    return 0;
1152}
1153
1154/*
1155 * Internal shared helper function.  Return 1 if found something
1156 * completed.  Note that rq->mop can be null for unexpected
1157 * messages.
1158 */
1159static int
1160test_rq(struct ib_work *rq, bmi_op_id_t *outid, bmi_error_code_t *err,
1161  bmi_size_t *size, void **user_ptr, int complete)
1162{
1163    ib_connection_t *c;
1164
1165    debug(9, "%s: rq %p outid %p err %p size %p user_ptr %p complete %d",
1166      __func__, rq, outid, err, size, user_ptr, complete);
1167
1168    if (rq->state.recv == RQ_EAGER_WAITING_USER_TEST
1169      || rq->state.recv == RQ_RTS_WAITING_USER_TEST) {
1170        if (complete) {
1171            debug(2, "%s: rq %p completed %lld from %s", __func__,
1172              rq, lld(rq->actual_len), rq->c->peername);
1173            *err = 0;
1174            *size = rq->actual_len;
1175            if (rq->mop) {
1176                *outid = rq->mop->op_id;
1177                if (user_ptr)
1178                    *user_ptr = rq->mop->user_ptr;
1179                id_gen_fast_unregister(rq->mop->op_id);
1180                free(rq->mop);
1181            }
1182            qlist_del(&rq->list);
1183            c = rq->c;
1184            free(rq);
1185            --c->refcnt;
1186            if (c->closed || c->cancelled)
1187                ib_close_connection(c);
1188            return 1;
1189        }
1190    /* this state needs help, push it (ideally would be triggered
1191     * when the resource is freed...) XXX */
1192    } else if (rq->state.recv == RQ_RTS_WAITING_CTS_BUFFER) {
1193        int ret;
1194        debug(2, "%s: rq %p %s, encouraging", __func__, rq,
1195          rq_state_name(rq->state.recv));
1196        ret = send_cts(rq);
1197        if (ret == 0)
1198            rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
1199        /* else keep waiting until we can send that cts */
1200        debug(2, "%s: rq %p now %s", __func__, rq, rq_state_name(rq->state.recv));
1201    } else if (rq->state.recv == RQ_CANCELLED && complete) {
1202        debug(2, "%s: rq %p cancelled", __func__, rq);
1203        *err = -PVFS_ETIMEDOUT;
1204        if (rq->mop) {
1205            *outid = rq->mop->op_id;
1206            if (user_ptr)
1207                *user_ptr = rq->mop->user_ptr;
1208            id_gen_fast_unregister(rq->mop->op_id);
1209            free(rq->mop);
1210        }
1211        qlist_del(&rq->list);
1212        c = rq->c;
1213        free(rq);
1214        --c->refcnt;
1215        if (c->closed || c->cancelled)
1216            ib_close_connection(c);
1217        return 1;
1218    } else {
1219        debug(9, "%s: rq %p found, not done, state %s", __func__,
1220          rq, rq_state_name(rq->state.recv));
1221    }
1222    return 0;
1223}
1224
1225/*
1226 * Test one message, send or receive.  Also used to test the send side of
1227 * messages sent using sendunexpected.
1228 */
1229static int
1230BMI_ib_test(bmi_op_id_t id, int *outcount, bmi_error_code_t *err,
1231  bmi_size_t *size, void **user_ptr, int max_idle_time __unused,
1232  bmi_context_id context_id __unused)
1233{
1234    struct method_op *mop;
1235    struct ib_work *sq;
1236    int n;
1237
1238    gen_mutex_lock(&interface_mutex);
1239    ib_check_cq();
1240
1241    mop = id_gen_fast_lookup(id);
1242    sq = mop->method_data;
1243    n = 0;
1244    if (sq->type == BMI_SEND) {
1245        if (test_sq(sq, &id, err, size, user_ptr, 1))
1246            n = 1;
1247    } else {
1248        /* actually a recv */
1249        struct ib_work *rq = mop->method_data;
1250        if (test_rq(rq, &id, err, size, user_ptr, 1))
1251            n = 1;
1252    }
1253    *outcount = n;
1254    gen_mutex_unlock(&interface_mutex);
1255    return 0;
1256}
1257
1258/*
1259 * Test just the particular list of op ids, returning the list of indices
1260 * that completed.
1261 */
1262static int BMI_ib_testsome(int incount, bmi_op_id_t *ids, int *outcount,
1263  int *index_array, bmi_error_code_t *errs, bmi_size_t *sizes, void **user_ptrs,
1264  int max_idle_time __unused, bmi_context_id context_id __unused)
1265{
1266    struct method_op *mop;
1267    struct ib_work *sq;
1268    bmi_op_id_t tid;
1269    int i, n;
1270
1271    gen_mutex_lock(&interface_mutex);
1272    ib_check_cq();
1273
1274    n = 0;
1275    for (i=0; i<incount; i++) {
1276        if (!ids[i])
1277            continue;
1278        mop = id_gen_fast_lookup(ids[i]);
1279        sq = mop->method_data;
1280
1281        if (sq->type == BMI_SEND) {
1282            if (test_sq(sq, &tid, &errs[n], &sizes[n], &user_ptrs[n], 1)) {
1283                index_array[n] = i;
1284                ++n;
1285            }
1286        } else {
1287            /* actually a recv */
1288            struct ib_work *rq = mop->method_data;
1289            if (test_rq(rq, &tid, &errs[n], &sizes[n], &user_ptrs[n], 1)) {
1290                index_array[n] = i;
1291                ++n;
1292            }
1293        }
1294    }
1295
1296    gen_mutex_unlock(&interface_mutex);
1297
1298    *outcount = n;
1299    return 0;
1300}
1301
1302/*
1303 * Test for multiple completions matching a particular user context.
1304 * Return 0 if okay, >0 if want another poll soon, negative for error.
1305 */
1306static int
1307BMI_ib_testcontext(int incount, bmi_op_id_t *outids, int *outcount,
1308  bmi_error_code_t *errs, bmi_size_t *sizes, void **user_ptrs,
1309  int max_idle_time, bmi_context_id context_id)
1310{
1311    struct qlist_head *l, *lnext;
1312    int n = 0, complete, activity = 0;
1313    void **up = NULL;
1314
1315    gen_mutex_lock(&interface_mutex);
1316
1317restart:
1318    activity += ib_check_cq();
1319
1320    /*
1321     * Walk _all_ entries on sq, rq, marking them completed or
1322     * encouraging them as needed due to resource limitations.
1323     */
1324    for (l=ib_device->sendq.next; l != &ib_device->sendq; l=lnext) {
1325        struct ib_work *sq = qlist_upcast(l);
1326        lnext = l->next;
1327        /* test them all, even if can't reap them, just to encourage */
1328        complete = (sq->mop->context_id == context_id) && (n < incount);
1329        if (user_ptrs)
1330            up = &user_ptrs[n];
1331        n += test_sq(sq, &outids[n], &errs[n], &sizes[n], up, complete);
1332    }
1333
1334    for (l=ib_device->recvq.next; l != &ib_device->recvq; l=lnext) {
1335        struct ib_work *rq = qlist_upcast(l);
1336        lnext = l->next;
1337
1338        /* some receives have no mops:  unexpected */
1339        complete = rq->mop &&
1340          (rq->mop->context_id == context_id) && (n < incount);
1341        if (user_ptrs)
1342            up = &user_ptrs[n];
1343        n += test_rq(rq, &outids[n], &errs[n], &sizes[n], up, complete);
1344    }
1345
1346    /* drop lock before blocking on new connections below */
1347    gen_mutex_unlock(&interface_mutex);
1348
1349    if (activity == 0 && n == 0 && max_idle_time > 0) {
1350        /*
1351         * Block if told to from above.
1352         */
1353        debug(8, "%s: last activity too long ago, blocking", __func__);
1354        activity = ib_block_for_activity(max_idle_time);
1355        if (activity == 1) {   /* IB action, go do it immediately */
1356            gen_mutex_lock(&interface_mutex);
1357            goto restart;
1358        }
1359    }
1360
1361    *outcount = n;
1362    return activity + n;
1363}
1364
1365/*
1366 * Non-blocking test to look for any incoming unexpected messages.
1367 * This is also where we check for new connections on the TCP socket, since
1368 * those would show up as unexpected the first time anything is sent.
1369 * Return 0 for success, or -1 for failure; number of things in *outcount.
1370 * Return >0 if want another poll soon.
1371 */
1372static int
1373BMI_ib_testunexpected(int incount __unused, int *outcount,
1374  struct bmi_method_unexpected_info *ui, int max_idle_time)
1375{
1376    struct qlist_head *l;
1377    int activity = 0, n;
1378
1379    gen_mutex_lock(&interface_mutex);
1380
1381    /* Check CQ, then look for the first unexpected message.  */
1382restart:
1383    activity += ib_check_cq();
1384
1385    n = 0;
1386    qlist_for_each(l, &ib_device->recvq) {
1387        struct ib_work *rq = qlist_upcast(l);
1388        if (rq->state.recv == RQ_EAGER_WAITING_USER_TESTUNEXPECTED) {
1389            msg_header_eager_t mh_eager;
1390            char *ptr = rq->bh->buf;
1391            ib_connection_t *c = rq->c;
1392
1393            decode_msg_header_eager_t(&ptr, &mh_eager);
1394
1395            debug(2, "%s: found waiting testunexpected", __func__);
1396            ui->error_code = 0;
1397            ui->addr = c->remote_map;  /* hand back permanent method_addr */
1398            ui->buffer = bmi_ib_malloc((unsigned long) rq->actual_len);
1399            ui->size = rq->actual_len;
1400            memcpy(ui->buffer,
1401                   (msg_header_eager_t *) rq->bh->buf + 1,
1402                   (size_t) ui->size);
1403            ui->tag = rq->bmi_tag;
1404            /* re-post the buffer in which it was sitting, just unexpecteds */
1405            post_rr(c, rq->bh);
1406            n = 1;
1407            qlist_del(&rq->list);
1408            free(rq);
1409            --c->refcnt;
1410            if (c->closed || c->cancelled)
1411                ib_close_connection(c);
1412            goto out;
1413        }
1414    }
1415
1416  out:
1417    gen_mutex_unlock(&interface_mutex);
1418
1419    if (activity == 0 && n == 0 && max_idle_time > 0) {
1420        /*
1421         * Block if told to from above, also polls TCP listening socket.
1422         */
1423        debug(8, "%s: last activity too long ago, blocking", __func__);
1424        activity = ib_block_for_activity(max_idle_time);
1425        if (activity == 1) {   /* IB action, go do it immediately */
1426            gen_mutex_lock(&interface_mutex);
1427            goto restart;
1428        }
1429    }
1430
1431    *outcount = n;
1432    return activity + n;
1433}
1434
1435/*
1436 * No need to track these internally.  Just search the entire queue.
1437 */
1438static int
1439BMI_ib_open_context(bmi_context_id context_id __unused)
1440{
1441    return 0;
1442}
1443
1444static void
1445BMI_ib_close_context(bmi_context_id context_id __unused)
1446{
1447}
1448
1449/*
1450 * Asynchronous call to destroy an in-progress operation.
1451 * Can't just call test since we don't want to reap the operation,
1452 * just make sure it's done or not.
1453 */
1454static int
1455BMI_ib_cancel(bmi_op_id_t id, bmi_context_id context_id __unused)
1456{
1457    struct method_op *mop;
1458    struct ib_work *tsq;
1459    ib_connection_t *c = 0;
1460
1461    gen_mutex_lock(&interface_mutex);
1462    ib_check_cq();
1463    mop = id_gen_fast_lookup(id);
1464    tsq = mop->method_data;
1465    if (tsq->type == BMI_SEND) {
1466        /*
1467         * Cancelling completed operations is fine, they will be
1468         * tested later.  Any others trigger full shutdown of the
1469         * connection.
1470         */
1471        if (tsq->state.send != SQ_WAITING_USER_TEST)
1472            c = tsq->c;
1473    } else {
1474        /* actually a recv */
1475        struct ib_work *rq = mop->method_data;
1476        if (!(rq->state.recv == RQ_EAGER_WAITING_USER_TEST
1477           || rq->state.recv == RQ_RTS_WAITING_USER_TEST))
1478            c = rq->c;
1479    }
1480
1481    if (c && !c->cancelled) {
1482        /*
1483         * In response to a cancel, forcibly close the connection.  Don't send
1484         * a bye message first since it may be the case that the peer is dead
1485         * anyway.  Do not close the connection until all the sq/rq on it have
1486         * gone away.
1487         */
1488        struct qlist_head *l;
1489
1490        c->cancelled = 1;
1491        drain_qp(c);
1492        qlist_for_each(l, &ib_device->sendq) {
1493            struct ib_work *sq = qlist_upcast(l);
1494            if (sq->c != c) continue;
1495#if !MEMCACHE_BOUNCEBUF
1496            if (sq->state.send == SQ_WAITING_DATA_SEND_COMPLETION)
1497                memcache_deregister(ib_device->memcache, &sq->buflist);
1498#  if MEMCACHE_EARLY_REG
1499            /* pin when sending rts, so also must dereg in this state */
1500            if (sq->state.send == SQ_WAITING_RTS_SEND_COMPLETION ||
1501                sq->state.send == SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS ||
1502                sq->state.send == SQ_WAITING_CTS)
1503                memcache_deregister(ib_device->memcache, &sq->buflist);
1504#  endif
1505#endif
1506            if (sq->state.send != SQ_WAITING_USER_TEST)
1507                sq->state.send = SQ_CANCELLED;
1508        }
1509        qlist_for_each(l, &ib_device->recvq) {
1510            struct ib_work *rq = qlist_upcast(l);
1511            if (rq->c != c) continue;
1512#if !MEMCACHE_BOUNCEBUF
1513            if (rq->state.recv == RQ_RTS_WAITING_RTS_DONE)
1514                memcache_deregister(ib_device->memcache, &rq->buflist);
1515#  if MEMCACHE_EARLY_REG
1516            /* pin on post, dereg all these */
1517            if (rq->state.recv == RQ_RTS_WAITING_CTS_SEND_COMPLETION)
1518                memcache_deregister(ib_device->memcache, &rq->buflist);
1519            if (rq->state.recv == RQ_WAITING_INCOMING
1520              && rq->buflist.tot_len > ib_device->eager_buf_payload)
1521                memcache_deregister(ib_device->memcache, &rq->buflist);
1522#  endif
1523#endif
1524            if (!(rq->state.recv == RQ_EAGER_WAITING_USER_TEST
1525               || rq->state.recv == RQ_RTS_WAITING_USER_TEST))
1526                rq->state.recv = RQ_CANCELLED;
1527        }
1528    }
1529
1530    gen_mutex_unlock(&interface_mutex);
1531    return 0;
1532}
1533
1534static const char *
1535BMI_ib_rev_lookup(struct bmi_method_addr *meth)
1536{
1537    ib_method_addr_t *ibmap = meth->method_data;
1538    if (!ibmap->c)
1539        return "(unconnected)";
1540    else
1541        return ibmap->c->peername;
1542}
1543
1544/*
1545 * Build and fill an IB-specific method_addr structure.
1546 */
1547static struct bmi_method_addr *ib_alloc_method_addr(ib_connection_t *c,
1548                                                char *hostname, int port,
1549                                                int reconnect_flag)
1550{
1551    struct bmi_method_addr *map;
1552    ib_method_addr_t *ibmap;
1553
1554    map = bmi_alloc_method_addr(bmi_ib_method_id, (bmi_size_t) sizeof(*ibmap));
1555    ibmap = map->method_data;
1556    ibmap->c = c;
1557    ibmap->hostname = hostname;
1558    ibmap->port = port;
1559    ibmap->reconnect_flag = reconnect_flag;
1560    ibmap->ref_count = 1;
1561
1562    return map;
1563}
1564
1565/*
1566 * Break up a method string like:
1567 *   ib://hostname:port/filesystem
1568 * into its constituent fields, storing them in an opaque
1569 * type, which is then returned.
1570 * XXX: I'm assuming that these actually return a _const_ pointer
1571 * so that I can hand back an existing map.
1572 */
1573static struct bmi_method_addr *BMI_ib_method_addr_lookup(const char *id)
1574{
1575    char *s, *hostname, *cp, *cq;
1576    int port;
1577    struct bmi_method_addr *map = NULL;
1578
1579    /* parse hostname */
1580    s = string_key("ib", id);  /* allocs a string */
1581    if (!s)
1582        return 0;
1583    cp = strchr(s, ':');
1584    if (!cp)
1585        error("%s: no ':' found", __func__);
1586
1587    /* copy to permanent storage */
1588    hostname = bmi_ib_malloc((unsigned long) (cp - s + 1));
1589    strncpy(hostname, s, (size_t) (cp-s));
1590    hostname[cp-s] = '\0';
1591
1592    /* strip /filesystem  */
1593    ++cp;
1594    cq = strchr(cp, '/');
1595    if (cq)
1596        *cq = 0;
1597    port = strtoul(cp, &cq, 10);
1598    if (cq == cp)
1599        error("%s: invalid port number", __func__);
1600    if (*cq != '\0')
1601        error("%s: extra characters after port number", __func__);
1602    free(s);
1603
1604    /* lookup in known connections, if there are any */
1605    gen_mutex_lock(&interface_mutex);
1606    if (ib_device) {
1607        struct qlist_head *l;
1608        qlist_for_each(l, &ib_device->connection) {
1609            ib_connection_t *c = qlist_upcast(l);
1610            ib_method_addr_t *ibmap = c->remote_map->method_data;
1611            if (ibmap->port == port && !strcmp(ibmap->hostname, hostname)) {
1612               map = c->remote_map;
1613               ibmap->ref_count++;
1614               break;
1615            }
1616        }
1617    }
1618    gen_mutex_unlock(&interface_mutex);
1619
1620    if (map)
1621        free(hostname);  /* found it */
1622    else
1623    {
1624        /* set reconnect flag on this addr; we will be acting as a client
1625         * for this connection and will be responsible for making sure that
1626         * the connection is established
1627         */
1628        map = ib_alloc_method_addr(0, hostname, port, 1);  /* alloc new one */
1629        /* but don't call bmi_method_addr_reg_callback! */
1630    }
1631
1632    return map;
1633}
1634
1635static ib_connection_t *ib_new_connection(int sock, const char *peername,
1636                                          int is_server)
1637{
1638    ib_connection_t *c;
1639    int i, ret;
1640
1641    c = bmi_ib_malloc(sizeof(*c));
1642    c->peername = strdup(peername);
1643
1644    /* fill send and recv free lists and buf heads */
1645    c->eager_send_buf_contig = bmi_ib_malloc(ib_device->eager_buf_num
1646      * ib_device->eager_buf_size);
1647    c->eager_recv_buf_contig = bmi_ib_malloc(ib_device->eager_buf_num
1648      * ib_device->eager_buf_size);
1649    INIT_QLIST_HEAD(&c->eager_send_buf_free);
1650    INIT_QLIST_HEAD(&c->eager_recv_buf_free);
1651    c->eager_send_buf_head_contig = bmi_ib_malloc(ib_device->eager_buf_num
1652      * sizeof(*c->eager_send_buf_head_contig));
1653    c->eager_recv_buf_head_contig = bmi_ib_malloc(ib_device->eager_buf_num
1654      * sizeof(*c->eager_recv_buf_head_contig));
1655    for (i=0; i<ib_device->eager_buf_num; i++) {
1656        struct buf_head *ebs = &c->eager_send_buf_head_contig[i];
1657        struct buf_head *ebr = &c->eager_recv_buf_head_contig[i];
1658        INIT_QLIST_HEAD(&ebs->list);
1659        INIT_QLIST_HEAD(&ebr->list);
1660        ebs->c = c;
1661        ebr->c = c;
1662        ebs->num = i;
1663        ebr->num = i;
1664        ebs->buf = (char *) c->eager_send_buf_contig
1665                 + i * ib_device->eager_buf_size;
1666        ebr->buf = (char *) c->eager_recv_buf_contig
1667                 + i * ib_device->eager_buf_size;
1668        qlist_add_tail(&ebs->list, &c->eager_send_buf_free);
1669        qlist_add_tail(&ebr->list, &c->eager_recv_buf_free);
1670    }
1671
1672    /* put it on the list */
1673    qlist_add(&c->list, &ib_device->connection);
1674
1675    /* other vars */
1676    c->remote_map = 0;
1677    c->cancelled = 0;
1678    c->refcnt = 0;
1679    c->closed = 0;
1680
1681    /* save one credit back for emergency credit refill */
1682    c->send_credit = ib_device->eager_buf_num - 1;
1683    c->return_credit = 0;
1684
1685    ret = new_connection(c, sock, is_server);
1686    if (ret) {
1687        ib_close_connection(c);
1688        c = NULL;
1689    }
1690
1691    return c;
1692}
1693
1694/*
1695 * Try to close and free a connection, but only do it if refcnt has
1696 * gone to zero.
1697 */
1698static void ib_close_connection(ib_connection_t *c)
1699{
1700    debug(2, "%s: closing connection to %s", __func__, c->peername);
1701    c->closed = 1;
1702    if (c->refcnt != 0) {
1703        debug(1, "%s: refcnt non-zero %d, delaying free", __func__, c->refcnt);
1704        return;
1705    }
1706
1707    close_connection(c);
1708
1709    free(c->eager_send_buf_contig);
1710    free(c->eager_recv_buf_contig);
1711    free(c->eager_send_buf_head_contig);
1712    free(c->eager_recv_buf_head_contig);
1713    /* never free the remote map, for the life of the executable, just
1714     * mark it unconnected since BMI will always have this structure. */
1715    if (c->remote_map) {
1716        ib_method_addr_t *ibmap = c->remote_map->method_data;
1717        ibmap->c = NULL;
1718    }
1719    free(c->peername);
1720    qlist_del(&c->list);
1721    free(c);
1722}
1723
1724/*
1725 * Blocking connect initiated by a post_sendunexpected{,_list}, or
1726 * post_recv*
1727 */
1728static int ib_tcp_client_connect(ib_method_addr_t *ibmap,
1729                                 struct bmi_method_addr *remote_map)
1730{
1731    int s;
1732    char peername[2048];
1733    struct hostent *hp;
1734    struct sockaddr_in skin;
1735   
1736    s = socket(AF_INET, SOCK_STREAM, 0);
1737    if (s < 0) {
1738        warning("%s: create tcp socket: %m", __func__);
1739        return bmi_errno_to_pvfs(-errno);
1740    }
1741    hp = gethostbyname(ibmap->hostname);
1742    if (!hp) {
1743        warning("%s: cannot resolve server %s", __func__, ibmap->hostname);
1744        return -1;
1745    }
1746    memset(&skin, 0, sizeof(skin));
1747    skin.sin_family = hp->h_addrtype;
1748    memcpy(&skin.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
1749    skin.sin_port = htons(ibmap->port);
1750    sprintf(peername, "%s:%d", ibmap->hostname, ibmap->port);
1751  retry:
1752    if (connect(s, (struct sockaddr *) &skin, sizeof(skin)) < 0) {
1753        if (errno == EINTR)
1754            goto retry;
1755        else {
1756            warning("%s: connect to server %s: %m", __func__, peername);
1757            return bmi_errno_to_pvfs(-errno);
1758        }
1759    }
1760    ibmap->c = ib_new_connection(s, peername, 0);
1761    if (!ibmap->c)
1762        error("%s: ib_new_connection failed", __func__);
1763    ibmap->c->remote_map = remote_map;
1764
1765    if (close(s) < 0) {
1766        warning("%s: close sock: %m", __func__);
1767        return bmi_errno_to_pvfs(-errno);
1768    }
1769    return 0;
1770}
1771
1772/*
1773 * On a server, initialize a socket for listening for new connections.
1774 */
1775static void ib_tcp_server_init_listen_socket(struct bmi_method_addr *addr)
1776{
1777    int flags;
1778    struct sockaddr_in skin;
1779    ib_method_addr_t *ibc = addr->method_data;
1780
1781    ib_device->listen_sock = socket(AF_INET, SOCK_STREAM, 0);
1782    if (ib_device->listen_sock < 0)
1783        error_errno("%s: create tcp socket", __func__);
1784    flags = 1;
1785    if (setsockopt(ib_device->listen_sock, SOL_SOCKET, SO_REUSEADDR, &flags,
1786      sizeof(flags)) < 0)
1787        error_errno("%s: setsockopt REUSEADDR", __func__);
1788    memset(&skin, 0, sizeof(skin));
1789    skin.sin_family = AF_INET;
1790    skin.sin_port = htons(ibc->port);
1791  retry:
1792    if (bind(ib_device->listen_sock, (struct sockaddr *) &skin, sizeof(skin)) < 0) {
1793        if (errno == EINTR)
1794            goto retry;
1795        else
1796            error_errno("%s: bind tcp socket", __func__);
1797    }
1798    if (listen(ib_device->listen_sock, 1024) < 0)
1799        error_errno("%s: listen tcp socket", __func__);
1800    flags = fcntl(ib_device->listen_sock, F_GETFL);
1801    if (flags < 0)
1802        error_errno("%s: fcntl getfl listen sock", __func__);
1803    flags |= O_NONBLOCK;
1804    if (fcntl(ib_device->listen_sock, F_SETFL, flags) < 0)
1805        error_errno("%s: fcntl setfl nonblock listen sock", __func__);
1806}
1807
1808/*
1809 * Check for new connections.  The listening socket is left nonblocking
1810 * so this test can be quick; but accept is not really that quick compared
1811 * to polling an IB interface, for instance.  Returns >0 if an accept worked.
1812 */
1813static int ib_tcp_server_check_new_connections(void)
1814{
1815    struct sockaddr_in ssin;
1816    socklen_t len;
1817    int s, ret = 0;
1818
1819    len = sizeof(ssin);
1820    s = accept(ib_device->listen_sock, (struct sockaddr *) &ssin, &len);
1821    if (s < 0) {
1822        if (!(errno == EAGAIN))
1823            error_errno("%s: accept listen sock", __func__);
1824    } else {
1825        char peername[2048];
1826        ib_connection_t *c;
1827
1828        char *hostname = strdup(inet_ntoa(ssin.sin_addr));
1829        int port = ntohs(ssin.sin_port);
1830        sprintf(peername, "%s:%d", hostname, port);
1831
1832        gen_mutex_lock(&interface_mutex);
1833
1834        c = ib_new_connection(s, peername, 1);
1835        if (!c) {
1836            free(hostname);
1837            goto out_unlock;
1838        }
1839
1840        /* don't set reconnect flag on this addr; we are a server in this
1841         * case and the peer will be responsible for maintaining the
1842         * connection
1843         */
1844        c->remote_map = ib_alloc_method_addr(c, hostname, port, 0);
1845        /* register this address with the method control layer */
1846        c->bmi_addr = bmi_method_addr_reg_callback(c->remote_map);
1847        if (c->bmi_addr == 0)
1848            error_xerrno(ENOMEM, "%s: bmi_method_addr_reg_callback", __func__);
1849
1850        debug(2, "%s: accepted new connection %s at server", __func__,
1851          c->peername);
1852        ret = 1;
1853
1854out_unlock:
1855        gen_mutex_unlock(&interface_mutex);
1856        if (close(s) < 0)
1857            error_errno("%s: close new sock", __func__);
1858    }
1859    return ret;
1860}
1861
1862/*
1863 * Ask the device to write to its FD if a CQ event happens, and poll on it
1864 * as well as the listen_sock for activity, but do not actually respond to
1865 * anything.  A later ib_check_cq will handle CQ events, and a later call to
1866 * testunexpected will pick up new connections.  Returns ==1 if IB device is
1867 * ready, other >0 for some activity, else 0.
1868 */
1869static int ib_block_for_activity(int timeout_ms)
1870{
1871    struct pollfd pfd[3];  /* cq fd, async fd, accept socket */
1872    int numfd;
1873    int ret;
1874
1875    prepare_cq_block(&pfd[0].fd, &pfd[1].fd);
1876    pfd[0].events = POLLIN;
1877    pfd[1].events = POLLIN;
1878    numfd = 2;
1879    if (ib_device->listen_sock >= 0) {
1880        pfd[2].fd = ib_device->listen_sock;
1881        pfd[2].events = POLLIN;
1882        numfd = 3;
1883    }
1884
1885    ret = poll(pfd, numfd, timeout_ms);
1886    debug(4, "%s: ret %d rev0 %x rev1 %x", __func__, ret,
1887          pfd[0].revents, pfd[1].revents);
1888    if (ret > 0) {
1889        if (pfd[0].revents == POLLIN) {
1890            ack_cq_completion_event();
1891            return 1;
1892        }
1893        /* check others only if CQ was empty */
1894        ret = 2;
1895        if (pfd[1].revents == POLLIN)
1896            check_async_events();
1897        if (pfd[2].revents == POLLIN)
1898            ib_tcp_server_check_new_connections();
1899    } else if (ret < 0) {
1900        if (errno == EINTR)  /* normal, ignore but break */
1901            ret = 0;
1902        else
1903            error_errno("%s: poll listen sock", __func__);
1904    }
1905    return ret;
1906}
1907
1908static void *BMI_ib_memalloc(bmi_size_t len,
1909                             enum bmi_op_type send_recv __unused)
1910{
1911    return memcache_memalloc(ib_device->memcache, len,
1912                             ib_device->eager_buf_payload);
1913}
1914
1915static int BMI_ib_memfree(void *buf, bmi_size_t len,
1916                          enum bmi_op_type send_recv __unused)
1917{
1918    return memcache_memfree(ib_device->memcache, buf, len);
1919}
1920
1921static int BMI_ib_unexpected_free(void *buf)
1922{
1923    free(buf);
1924    return 0;
1925}
1926
1927/*
1928 * Callers sometimes want to know odd pieces of information.  Satisfy
1929 * them.
1930 */
1931static int BMI_ib_get_info(int option, void *param)
1932{
1933    int ret = 0;
1934
1935    switch (option) {
1936        case BMI_CHECK_MAXSIZE:
1937            /* reality is 2^31, but shrink to avoid negative int */
1938            *(int *)param = (1UL << 31) - 1;
1939            break;
1940        case BMI_GET_UNEXP_SIZE:
1941            *(int *)param = ib_device->eager_buf_payload;
1942            break;
1943        default:
1944            ret = -ENOSYS;
1945    }
1946    return ret;
1947}
1948
1949/*
1950 * Used to set some optional parameters and random functions, like ioctl.
1951 */
1952static int BMI_ib_set_info(int option, void *param __unused)
1953{
1954    switch (option) {
1955    case BMI_DROP_ADDR: {
1956        struct bmi_method_addr *map = param;
1957        ib_method_addr_t *ibmap = map->method_data;
1958        ibmap->ref_count--;
1959        if (ibmap->ref_count == 0)
1960        {
1961           free(ibmap->hostname);
1962           free(map);
1963        }
1964        break;
1965    }
1966    case BMI_OPTIMISTIC_BUFFER_REG: {
1967        /* not guaranteed to work */
1968        const struct bmi_optimistic_buffer_info *binfo = param;
1969        memcache_preregister(ib_device->memcache, binfo->buffer,
1970                             binfo->len, binfo->rw);
1971        break;
1972    }
1973    default:
1974        /* Should return -ENOSYS, but return 0 for caller ease. */
1975        break;
1976    }
1977    return 0;
1978}
1979
1980#ifdef OPENIB
1981extern int openib_ib_initialize(void);
1982#endif
1983#ifdef VAPI
1984extern int vapi_ib_initialize(void);
1985#endif
1986
1987/*
1988 * Startup, once per application.
1989 */
1990static int BMI_ib_initialize(struct bmi_method_addr *listen_addr, int method_id,
1991                             int init_flags)
1992{
1993    int ret;
1994
1995    debug(0, "%s: init", __func__);
1996
1997    gen_mutex_lock(&interface_mutex);
1998
1999    /* check params */
2000    if (!!listen_addr ^ (init_flags & BMI_INIT_SERVER))
2001        error("%s: error: BMI_INIT_SERVER requires non-null listen_addr"
2002          " and v.v", __func__);
2003
2004    bmi_ib_method_id = method_id;
2005
2006    ib_device = bmi_ib_malloc(sizeof(*ib_device));
2007
2008    /* try, in order, OpenIB then VAPI; set up function pointers */
2009    ret = 1;
2010#ifdef OPENIB
2011    ret = openib_ib_initialize();
2012#endif
2013#ifdef VAPI
2014    if (ret)
2015        ret = vapi_ib_initialize();
2016#endif
2017    if (ret)
2018        return -ENODEV;  /* neither found */
2019
2020    /* initialize memcache */
2021    ib_device->memcache = memcache_init(mem_register, mem_deregister);
2022#if 0
2023    /*
2024     * Need this for correctness.  Could use malloc/free hooks instead, but
2025     * they fight with mpich's use, for example.  Consider switching to dreg
2026     * kernel module.
2027     */
2028    mallopt(M_TRIM_THRESHOLD, -1);
2029    mallopt(M_MMAP_MAX, 0);
2030#endif
2031
2032    /*
2033     * Set up tcp socket to listen for connection requests.
2034     * The hostname is currently ignored; the port number is used to bind
2035     * the listening TCP socket which accepts new connections.
2036     */
2037    if (init_flags & BMI_INIT_SERVER) {
2038        ib_tcp_server_init_listen_socket(listen_addr);
2039        ib_device->listen_addr = listen_addr;
2040    } else {
2041        ib_device->listen_sock = -1;
2042        ib_device->listen_addr = NULL;
2043    }
2044
2045    /*
2046     * Initialize data structures.
2047     */
2048    INIT_QLIST_HEAD(&ib_device->connection);
2049    INIT_QLIST_HEAD(&ib_device->sendq);
2050    INIT_QLIST_HEAD(&ib_device->recvq);
2051
2052    ib_device->eager_buf_num  = DEFAULT_EAGER_BUF_NUM;
2053    ib_device->eager_buf_size = DEFAULT_EAGER_BUF_SIZE;
2054    ib_device->eager_buf_payload = ib_device->eager_buf_size - sizeof(msg_header_eager_t);
2055
2056    gen_mutex_unlock(&interface_mutex);
2057
2058    debug(0, "%s: done", __func__);
2059    return ret;
2060}
2061
2062/*
2063 * Shutdown.
2064 */
2065static int BMI_ib_finalize(void)
2066{
2067    gen_mutex_lock(&interface_mutex);
2068
2069    /* if client, send BYE to each connection and bring down the QP */
2070    if (ib_device->listen_sock < 0) {
2071        struct qlist_head *l;
2072        qlist_for_each(l, &ib_device->connection) {
2073            ib_connection_t *c = qlist_upcast(l);
2074            if (c->cancelled)
2075                continue;  /* already closed */
2076            /* Send BYE message to servers, transition QP to drain state */
2077            send_bye(c);
2078            drain_qp(c);
2079        }
2080    }
2081    /* if server, stop listening */
2082    if (ib_device->listen_sock >= 0) {
2083        ib_method_addr_t *ibmap = ib_device->listen_addr->method_data;
2084        close(ib_device->listen_sock);
2085        free(ibmap->hostname);
2086        free(ib_device->listen_addr);
2087    }
2088
2089    /* destroy QPs and other connection structures */
2090    while (ib_device->connection.next != &ib_device->connection) {
2091        ib_connection_t *c = (ib_connection_t *) ib_device->connection.next;
2092        ib_close_connection(c);
2093    }
2094
2095#if MEMCACHE_BOUNCEBUF
2096    if (reg_send_buflist.num > 0) {
2097        memcache_deregister(ib_device->memcache, &reg_send_buflist);
2098        reg_send_buflist.num = 0;
2099        free(reg_send_buflist_buf);
2100    }
2101    if (reg_recv_buflist.num > 0) {
2102        memcache_deregister(ib_device->memcache, &reg_recv_buflist);
2103        reg_recv_buflist.num = 0;
2104        free(reg_recv_buflist_buf);
2105    }
2106#endif
2107
2108    memcache_shutdown(ib_device->memcache);
2109
2110    ib_finalize();
2111
2112    free(ib_device);
2113    ib_device = NULL;
2114
2115    gen_mutex_unlock(&interface_mutex);
2116    return 0;
2117}
2118
2119const struct bmi_method_ops bmi_ib_ops =
2120{
2121    .method_name = "bmi_ib",
2122    .flags = 0,
2123    .initialize = BMI_ib_initialize,
2124    .finalize = BMI_ib_finalize,
2125    .set_info = BMI_ib_set_info,
2126    .get_info = BMI_ib_get_info,
2127    .memalloc = BMI_ib_memalloc,
2128    .memfree = BMI_ib_memfree,
2129    .unexpected_free = BMI_ib_unexpected_free,
2130    .post_send = BMI_ib_post_send,
2131    .post_sendunexpected = BMI_ib_post_sendunexpected,
2132    .post_recv = BMI_ib_post_recv,
2133    .test = BMI_ib_test,
2134    .testsome = BMI_ib_testsome,
2135    .testcontext = BMI_ib_testcontext,
2136    .testunexpected = BMI_ib_testunexpected,
2137    .method_addr_lookup = BMI_ib_method_addr_lookup,
2138    .post_send_list = BMI_ib_post_send_list,
2139    .post_recv_list = BMI_ib_post_recv_list,
2140    .post_sendunexpected_list = BMI_ib_post_sendunexpected_list,
2141    .open_context = BMI_ib_open_context,
2142    .close_context = BMI_ib_close_context,
2143    .cancel = BMI_ib_cancel,
2144    .rev_lookup_unexpected = BMI_ib_rev_lookup,
2145    .query_addr_range = NULL,
2146};
2147
Note: See TracBrowser for help on using the browser.