root/branches/stable/src/io/bmi/bmi_ib/ib.c @ 9294

Revision 9294, 61.9 KB (checked in by wolf, 13 months ago)

Fix ib_check_cq error to warning on receive error IBV_WC_WR_FLUSH_ERR

Line 
1/*
2 * InfiniBand BMI method.
3 *
4 * Copyright (C) 2003-6 Pete Wyckoff <pw@osc.edu>
5 * Copyright (C) 2006 Kyle Schochenmaier <kschoche@scl.ameslab.gov>
6 *
7 * See COPYING in top-level directory.
8 */
9#include <stdio.h>
10#include <stdlib.h>
11#include <string.h>
12#include <unistd.h>
13#include <errno.h>
14#include <fcntl.h>
15#include <sys/types.h>
16#include <arpa/inet.h>   /* inet_ntoa */
17#include <netdb.h>       /* gethostbyname */
18#include <sys/poll.h>
19#define __PINT_REQPROTO_ENCODE_FUNCS_C  /* include definitions */
20#include <src/common/id-generator/id-generator.h>
21#include <src/io/bmi/bmi-method-support.h>   /* bmi_method_ops ... */
22#include <src/io/bmi/bmi-method-callback.h>  /* bmi_method_addr_reg_callback */
23#include <src/common/gen-locks/gen-locks.h>  /* gen_mutex_t ... */
24#include <src/common/misc/pvfs2-internal.h>
25#include "pint-hint.h"
26
27#ifdef HAVE_VALGRIND_H
28#include <memcheck.h>
29#else
30#define VALGRIND_MAKE_MEM_DEFINED(addr,len)
31#endif
32
33#include "ib.h"
34
35static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
36
37/*
38 * Handle given by upper layer, which must be handed back to create
39 * method_addrs.
40 */
41static int bmi_ib_method_id;
42
43/* alloc space for the single device structure pointer; one for
44 * vapi and one for openib */
45ib_device_t *ib_device __hidden = NULL;
46
47/* these all vector through the ib_device */
48#define new_connection ib_device->func.new_connection
49#define close_connection ib_device->func.close_connection
50#define drain_qp ib_device->func.drain_qp
51#define ib_initialize ib_device->func.ib_initialize
52#define ib_finalize ib_device->func.ib_finalize
53#define post_sr ib_device->func.post_sr
54#define post_sr_rdmaw ib_device->func.post_sr_rdmaw
55#define check_cq ib_device->func.check_cq
56#define prepare_cq_block ib_device->func.prepare_cq_block
57#define ack_cq_completion_event ib_device->func.ack_cq_completion_event
58#define wc_status_string ib_device->func.wc_status_string
59#define mem_register ib_device->func.mem_register
60#define mem_deregister ib_device->func.mem_deregister
61#define check_async_events ib_device->func.check_async_events
62
63#if MEMCACHE_EARLY_REG
64#if MEMCACHE_BOUNCEBUF
65#error Not sensible to use bouncebuf with early reg.  First use of bouncebuf \
66  will register it, thus no effect whether early reg or not.
67#endif
68#endif
69
70#if MEMCACHE_BOUNCEBUF
71static ib_buflist_t reg_send_buflist = { .num = 0 };
72static ib_buflist_t reg_recv_buflist = { .num = 0 };
73static void *reg_send_buflist_buf;
74static void *reg_recv_buflist_buf;
75static const bmi_size_t reg_send_buflist_len = 256 * 1024;
76static const bmi_size_t reg_recv_buflist_len = 256 * 1024;
77#endif
78
79static void encourage_send_incoming_cts(struct buf_head *bh, u_int32_t byte_len);
80static void encourage_recv_incoming(struct buf_head *bh, msg_type_t type,
81                                    u_int32_t byte_len);
82static void encourage_rts_done_waiting_buffer(struct ib_work *sq);
83static int send_cts(struct ib_work *rq);
84static void ib_close_connection(ib_connection_t *c);
85static int ib_tcp_client_connect(ib_method_addr_t *ibmap,
86                                 struct bmi_method_addr *remote_map);
87static int ib_tcp_server_check_new_connections(void);
88static int ib_block_for_activity(int timeout_ms);
89
90/*
91 * Return string form of work completion opcode field.
92 */
93static const char *wc_opcode_string(int opcode)
94{
95    if (opcode == BMI_IB_OP_SEND)
96        return "SEND";
97    else if (opcode == BMI_IB_OP_RECV)
98        return "RECV";
99    else if (opcode == BMI_IB_OP_RDMA_WRITE)
100        return "RDMA WRITE";
101    else
102        return "(UNKNOWN)";
103}
104
105/*
106 * Wander through single completion queue, pulling off messages and
107 * sticking them on the proper connection queues.  Later you can
108 * walk the incomingq looking for things to do to them.  Returns
109 * number of new things that arrived.
110 */
111static int ib_check_cq(void)
112{
113    int ret = 0;
114
115    for (;;) {
116        struct bmi_ib_wc wc;
117        int vret;
118
119        vret = check_cq(&wc);
120        if (vret == 0)
121            break;  /* empty */
122
123        debug(4, "%s: found something", __func__);
124        ++ret;
125        if (wc.status != 0) {
126            struct buf_head *bh = ptr_from_int64(wc.id);
127            /* opcode is not necessarily valid; only wr_id, status, qp_num,
128             * and vendor_err can be relied upon */
129            if (wc.opcode == BMI_IB_OP_SEND) {
130                debug(0, "%s: entry id 0x%llx SEND error %s to %s", __func__,
131                  llu(wc.id), wc_status_string(wc.status), bh->c->peername);
132                if (wc.id) {
133                    ib_connection_t *c = ptr_from_int64(wc.id);
134                    if (c->cancelled) {
135                        debug(0,
136                          "%s: ignoring send error on cancelled conn to %s",
137                          __func__, bh->c->peername);
138                    }
139                }
140            } else {
141                warning("%s: entry id 0x%llx opcode %s error %s from %s",
142                  __func__,
143                  llu(wc.id), wc_opcode_string(wc.opcode),
144                  wc_status_string(wc.status), bh->c->peername);
145                continue;
146            }
147        }
148
149        if (wc.opcode == BMI_IB_OP_RECV) {
150            /*
151             * Remote side did a send to us.
152             */
153            msg_header_common_t mh_common;
154            struct buf_head *bh = ptr_from_int64(wc.id);
155            char *ptr = bh->buf;
156            u_int32_t byte_len = wc.byte_len;
157
158            VALGRIND_MAKE_MEM_DEFINED(ptr, byte_len);
159            decode_msg_header_common_t(&ptr, &mh_common);
160            bh->c->send_credit += mh_common.credit;
161
162            debug(2, "%s: recv from %s len %d type %s credit %d",
163                  __func__, bh->c->peername, byte_len,
164                  msg_type_name(mh_common.type), mh_common.credit);
165            if (mh_common.type == MSG_CTS) {
166                /* incoming CTS messages go to the send engine */
167                encourage_send_incoming_cts(bh, byte_len);
168            } else {
169                /* something for the recv side, no known rq yet */
170                encourage_recv_incoming(bh, mh_common.type, byte_len);
171            }
172
173        } else if (wc.opcode == BMI_IB_OP_RDMA_WRITE) {
174
175            /* completion event for the rdma write we initiated, used
176             * to signal memory unpin etc. */
177            struct ib_work *sq = ptr_from_int64(wc.id);
178
179            debug(3, "%s: sq %p %s", __func__, sq,
180                  sq_state_name(sq->state.send));
181
182            bmi_ib_assert(sq->state.send == SQ_WAITING_DATA_SEND_COMPLETION,
183                          "%s: wrong send state %s", __func__,
184                          sq_state_name(sq->state.send));
185
186            sq->state.send = SQ_WAITING_RTS_DONE_BUFFER;
187
188#if !MEMCACHE_BOUNCEBUF
189            memcache_deregister(ib_device->memcache, &sq->buflist);
190#endif
191            debug(2, "%s: sq %p RDMA write done, now %s", __func__, sq,
192                  sq_state_name(sq->state.send));
193
194            encourage_rts_done_waiting_buffer(sq);
195
196        } else if (wc.opcode == BMI_IB_OP_SEND) {
197
198            struct buf_head *bh = ptr_from_int64(wc.id);
199            struct ib_work *sq = bh->sq;
200
201            if (sq == NULL) {
202                /* MSG_BYE or MSG_CREDIT */
203                debug(2, "%s: MSG_BYE or MSG_CREDIT completed locally",
204                      __func__);
205            } else if (sq->type == BMI_SEND) {
206                sq_state_t state = sq->state.send;
207
208                if (state == SQ_WAITING_EAGER_SEND_COMPLETION)
209                    sq->state.send = SQ_WAITING_USER_TEST;
210                else if (state == SQ_WAITING_RTS_SEND_COMPLETION)
211                    sq->state.send = SQ_WAITING_CTS;
212                else if (state == SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS)
213                    sq->state.send = SQ_WAITING_DATA_SEND_COMPLETION;
214                else if (state == SQ_WAITING_RTS_DONE_SEND_COMPLETION)
215                    sq->state.send = SQ_WAITING_USER_TEST;
216                else if (state == SQ_CANCELLED)
217                    ;
218                else
219                    bmi_ib_assert(0, "%s: unknown send state %s (%d) of sq %p",
220                                  __func__, sq_state_name(sq->state.send),
221                                  sq->state.send, sq);
222                debug(2, "%s: send to %s completed locally: sq %p -> %s",
223                      __func__, bh->c->peername, sq,
224                      sq_state_name(sq->state.send));
225
226            } else {
227                struct ib_work *rq = sq;  /* rename */
228                rq_state_t state = rq->state.recv;
229               
230                if (state == RQ_RTS_WAITING_CTS_SEND_COMPLETION)
231                    rq->state.recv = RQ_RTS_WAITING_RTS_DONE;
232                else if (state == RQ_CANCELLED)
233                    ;
234                else
235                    bmi_ib_assert(0, "%s: unknown recv state %s of rq %p",
236                                  __func__, rq_state_name(rq->state.recv), rq);
237                debug(2, "%s: send to %s completed locally: rq %p -> %s",
238                      __func__, bh->c->peername, rq,
239                      rq_state_name(rq->state.recv));
240            }
241
242            qlist_add_tail(&bh->list, &bh->c->eager_send_buf_free);
243
244        } else {
245            error("%s: cq entry id 0x%llx opcode %d unexpected", __func__,
246              llu(wc.id), wc.opcode);
247        }
248    }
249    return ret;
250}
251
252/*
253 * Initialize common header of all messages.
254 */
255static void msg_header_init(msg_header_common_t *mh_common,
256                            ib_connection_t *c, msg_type_t type)
257{
258    mh_common->type = type;
259    mh_common->credit = c->return_credit;
260    c->return_credit = 0;
261}
262
263/*
264 * Grab an empty buf head, if available.
265 */
266static struct buf_head *get_eager_buf(ib_connection_t *c)
267{
268    struct buf_head *bh = NULL;
269
270    if (c->send_credit > 0) {
271        --c->send_credit;
272        bh = qlist_try_del_head(&c->eager_send_buf_free);
273        bmi_ib_assert(bh, "%s: empty eager_send_buf_free list, peer %s",
274                      __func__, c->peername);
275    }
276    return bh;
277}
278
279/*
280 * Re-post a receive buffer, possibly returning credit to the peer.
281 */
282static void post_rr(ib_connection_t *c, struct buf_head *bh)
283{
284    ib_device->func.post_rr(c, bh);
285    ++c->return_credit;
286
287    /* if credits are building up, explicitly send them over */
288    if (c->return_credit > ib_device->eager_buf_num - 4) {
289        msg_header_common_t mh_common;
290        char *ptr;
291
292        /* one credit saved back for just this situation, do not check */
293        --c->send_credit;
294        bh = qlist_try_del_head(&c->eager_send_buf_free);
295        bmi_ib_assert(bh, "%s: empty eager_send_buf_free list", __func__);
296        bh->sq = NULL;
297        debug(2, "%s: return %d credits to %s", __func__, c->return_credit,
298              c->peername);
299        msg_header_init(&mh_common, c, MSG_CREDIT);
300        ptr = bh->buf;
301        encode_msg_header_common_t(&ptr, &mh_common);
302        post_sr(bh, sizeof(mh_common));
303    }
304}
305
306/*
307 * Push a send message along its next step.  Called internally only.
308 */
309static void encourage_send_waiting_buffer(struct ib_work *sq)
310{
311    struct buf_head *bh;
312    ib_connection_t *c = sq->c;
313
314    debug(3, "%s: sq %p", __func__, sq);
315    bmi_ib_assert(sq->state.send == SQ_WAITING_BUFFER,
316                  "%s: wrong send state %s", __func__,
317                  sq_state_name(sq->state.send));
318
319    bh = get_eager_buf(c);
320    if (!bh) {
321        debug(2, "%s: sq %p no free send buffers to %s", __func__,
322              sq, c->peername);
323        return;
324    }
325    sq->bh = bh;
326    bh->sq = sq;  /* uplink for completion */
327
328    if (sq->buflist.tot_len <= ib_device->eager_buf_payload) {
329        /*
330         * Eager send.
331         */
332        msg_header_eager_t mh_eager;
333        char *ptr = bh->buf;
334
335        msg_header_init(&mh_eager.c, c, sq->is_unexpected
336                        ? MSG_EAGER_SENDUNEXPECTED : MSG_EAGER_SEND);
337        mh_eager.bmi_tag = sq->bmi_tag;
338        encode_msg_header_eager_t(&ptr, &mh_eager);
339
340        memcpy_from_buflist(&sq->buflist,
341                            (msg_header_eager_t *) bh->buf + 1);
342
343        /* send the message */
344        post_sr(bh, (u_int32_t) (sizeof(mh_eager) + sq->buflist.tot_len));
345
346        /* wait for ack saying remote has received and recycled his buf */
347        sq->state.send = SQ_WAITING_EAGER_SEND_COMPLETION;
348        debug(2, "%s: sq %p sent EAGER len %lld", __func__, sq,
349              lld(sq->buflist.tot_len));
350
351    } else {
352        /*
353         * Request to send, rendez-vous.  Include the mop id in the message
354         * which will be returned to us in the CTS so we can look it up.
355         */
356        msg_header_rts_t mh_rts;
357        char *ptr = bh->buf;
358
359        msg_header_init(&mh_rts.c, c, MSG_RTS);
360        mh_rts.bmi_tag = sq->bmi_tag;
361        mh_rts.mop_id = sq->mop->op_id;
362        mh_rts.tot_len = sq->buflist.tot_len;
363
364        encode_msg_header_rts_t(&ptr, &mh_rts);
365
366        post_sr(bh, sizeof(mh_rts));
367
368#if MEMCACHE_EARLY_REG
369        /* XXX: need to lock against receiver thread?  Could poll return
370         * the CTS and start the data send before this completes? */
371        memcache_register(ib_device->memcache, &sq->buflist);
372#endif
373
374        sq->state.send = SQ_WAITING_RTS_SEND_COMPLETION;
375        debug(2, "%s: sq %p sent RTS mopid %llx len %lld", __func__, sq,
376              llu(sq->mop->op_id), lld(sq->buflist.tot_len));
377    }
378}
379
380/*
381 * Look at the incoming message which is a response to an earlier RTS
382 * from us, and start the real data send.
383 */
384static void
385encourage_send_incoming_cts(struct buf_head *bh, u_int32_t byte_len)
386{
387    msg_header_cts_t mh_cts;
388    struct ib_work *sq, *sqt;
389    u_int32_t want;
390    char *ptr = bh->buf;
391
392    decode_msg_header_cts_t(&ptr, &mh_cts);
393
394    /*
395     * Look through this CTS message to determine the owning sq.  Works
396     * using the mop_id which was sent during the RTS, now returned to us.
397     */
398    sq = 0;
399    qlist_for_each_entry(sqt, &ib_device->sendq, list) {
400        debug(8, "%s: looking for op_id 0x%llx, consider 0x%llx", __func__,
401          llu(mh_cts.rts_mop_id), llu(sqt->mop->op_id));
402        if (sqt->c == bh->c
403            && sqt->mop->op_id == (bmi_op_id_t) mh_cts.rts_mop_id) {
404            sq = sqt;
405            break;
406        }
407    }
408    if (!sq)
409        error("%s: mop_id %llx in CTS message not found", __func__,
410          llu(mh_cts.rts_mop_id));
411
412    debug(2, "%s: sq %p %s mopid %llx len %u", __func__, sq,
413          sq_state_name(sq->state.send), llu(mh_cts.rts_mop_id), byte_len);
414    bmi_ib_assert(sq->state.send == SQ_WAITING_CTS ||
415                  sq->state.send == SQ_WAITING_RTS_SEND_COMPLETION,
416                  "%s: wrong send state %s", __func__,
417                  sq_state_name(sq->state.send));
418
419    /* message; cts content; list of buffers, lengths, and keys */
420    want = sizeof(mh_cts)
421         + mh_cts.buflist_num * MSG_HEADER_CTS_BUFLIST_ENTRY_SIZE;
422    if (bmi_ib_unlikely(byte_len != want))
423        error("%s: wrong message size for CTS, got %u, want %u", __func__,
424              byte_len, want);
425
426    /* start the big tranfser */
427    post_sr_rdmaw(sq, &mh_cts, (msg_header_cts_t *) bh->buf + 1);
428
429    /* re-post our recv buf now that we have all the information from CTS */
430    post_rr(sq->c, bh);
431
432    if (sq->state.send == SQ_WAITING_CTS)
433        sq->state.send = SQ_WAITING_DATA_SEND_COMPLETION;
434    else
435        sq->state.send = SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS;
436    debug(3, "%s: sq %p now %s", __func__, sq, sq_state_name(sq->state.send));
437}
438
439
440/*
441 * See if anything was preposted that matches this.
442 */
443static struct ib_work *
444find_matching_recv(rq_state_t statemask, const ib_connection_t *c,
445  bmi_msg_tag_t bmi_tag)
446{
447    struct ib_work *rq;
448
449    qlist_for_each_entry(rq, &ib_device->recvq, list) {
450        if ((rq->state.recv & statemask) && rq->c == c
451            && rq->bmi_tag == bmi_tag)
452            return rq;
453    }
454    return NULL;
455}
456
457/*
458 * Init a new recvq entry from something that arrived on the wire.
459 */
460static struct ib_work *
461alloc_new_recv(ib_connection_t *c, struct buf_head *bh)
462{
463    struct ib_work *rq = bmi_ib_malloc(sizeof(*rq));
464    rq->type = BMI_RECV;
465    rq->c = c;
466    ++rq->c->refcnt;
467    rq->bh = bh;
468    rq->mop = 0;  /* until user posts for it */
469    rq->rts_mop_id = 0;
470    qlist_add_tail(&rq->list, &ib_device->recvq);
471    return rq;
472}
473
474/*
475 * Called from incoming message processing, except for the case
476 * of ack to a CTS, for which we know the rq (see below).
477 *
478 * Unexpected receive, either no post or explicit sendunexpected.
479 */
480static void
481encourage_recv_incoming(struct buf_head *bh, msg_type_t type, u_int32_t byte_len)
482{
483    ib_connection_t *c = bh->c;
484    struct ib_work *rq;
485    char *ptr = bh->buf;
486
487    debug(4, "%s: incoming msg type %s", __func__, msg_type_name(type));
488
489    if (type == MSG_EAGER_SEND) {
490
491        msg_header_eager_t mh_eager;
492
493        ptr = bh->buf;
494        decode_msg_header_eager_t(&ptr, &mh_eager);
495
496        debug(2, "%s: recv eager len %u", __func__, byte_len);
497
498        rq = find_matching_recv(RQ_WAITING_INCOMING, c, mh_eager.bmi_tag);
499        if (rq) {
500            bmi_size_t len = byte_len - sizeof(mh_eager);
501            if (len > rq->buflist.tot_len)
502                error("%s: EAGER received %lld too small for buffer %lld",
503                  __func__, lld(len), lld(rq->buflist.tot_len));
504
505            memcpy_to_buflist(&rq->buflist,
506                              (msg_header_eager_t *) bh->buf + 1,
507                              len);
508
509            /* re-post */
510            post_rr(c, bh);
511
512            rq->state.recv = RQ_EAGER_WAITING_USER_TEST;
513            debug(2, "%s: matched rq %p now %s", __func__, rq,
514              rq_state_name(rq->state.recv));
515#if MEMCACHE_EARLY_REG
516            /* if a big receive was posted but only a small message came
517             * through, unregister it now */
518            if (rq->buflist.tot_len > ib_device->eager_buf_payload) {
519                debug(2, "%s: early registration not needed, dereg after eager",
520                  __func__);
521                memcache_deregister(ib_device->memcache, &rq->buflist);
522            }
523#endif
524
525        } else {
526            rq = alloc_new_recv(c, bh);
527            /* return value for when user does post_recv for this one */
528            rq->bmi_tag = mh_eager.bmi_tag;
529            rq->state.recv = RQ_EAGER_WAITING_USER_POST;
530            /* do not repost or ack, keeping bh until user test */
531            debug(2, "%s: new rq %p now %s", __func__, rq,
532              rq_state_name(rq->state.recv));
533        }
534        rq->actual_len = byte_len - sizeof(mh_eager);
535
536    } else if (type == MSG_EAGER_SENDUNEXPECTED) {
537
538        msg_header_eager_t mh_eager;
539
540        ptr = bh->buf;
541        decode_msg_header_eager_t(&ptr, &mh_eager);
542
543        debug(2, "%s: recv eager unexpected len %u", __func__, byte_len);
544
545        rq = alloc_new_recv(c, bh);
546        /* return values for when user does testunexpected for this one */
547        rq->bmi_tag = mh_eager.bmi_tag;
548        rq->state.recv = RQ_EAGER_WAITING_USER_TESTUNEXPECTED;
549        rq->actual_len = byte_len - sizeof(mh_eager);
550        /* do not repost, keeping bh until user test */
551        debug(2, "%s: new rq %p now %s", __func__, rq,
552          rq_state_name(rq->state.recv));
553
554    } else if (type == MSG_RTS) {
555        /*
556         * Sender wants to send a big message, initiates rts/cts protocol.
557         * Has the user posted a matching receive for it yet?
558         */
559        msg_header_rts_t mh_rts;
560
561        ptr = bh->buf;
562        decode_msg_header_rts_t(&ptr, &mh_rts);
563
564        debug(2, "%s: recv RTS len %lld mopid %llx", __func__,
565              lld(mh_rts.tot_len), llu(mh_rts.mop_id));
566
567        rq = find_matching_recv(RQ_WAITING_INCOMING, c, mh_rts.bmi_tag);
568        if (rq) {
569            if ((int)mh_rts.tot_len > rq->buflist.tot_len) {
570                error("%s: RTS received %llu too small for buffer %llu",
571                  __func__, llu(mh_rts.tot_len), llu(rq->buflist.tot_len));
572            }
573            rq->state.recv = RQ_RTS_WAITING_CTS_BUFFER;
574            debug(2, "%s: matched rq %p MSG_RTS now %s", __func__, rq,
575              rq_state_name(rq->state.recv));
576        } else {
577            rq = alloc_new_recv(c, bh);
578            /* return value for when user does post_recv for this one */
579            rq->bmi_tag = mh_rts.bmi_tag;
580            rq->state.recv = RQ_RTS_WAITING_USER_POST;
581            debug(2, "%s: new rq %p MSG_RTS now %s", __func__, rq,
582              rq_state_name(rq->state.recv));
583        }
584        rq->actual_len = mh_rts.tot_len;
585        rq->rts_mop_id = mh_rts.mop_id;
586
587        post_rr(c, bh);
588
589        if (rq->state.recv == RQ_RTS_WAITING_CTS_BUFFER) {
590            int ret;
591            ret = send_cts(rq);
592            if (ret == 0)
593                rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
594            /* else keep waiting until we can send that cts */
595        }
596
597    } else if (type == MSG_RTS_DONE) {
598
599        msg_header_rts_done_t mh_rts_done;
600        struct ib_work *rqt;
601
602        ptr = bh->buf;
603        decode_msg_header_rts_done_t(&ptr, &mh_rts_done);
604
605        debug(2, "%s: recv RTS_DONE mop_id %llx", __func__,
606              llu(mh_rts_done.mop_id));
607
608        rq = NULL;
609        qlist_for_each_entry(rqt, &ib_device->recvq, list) {
610            if (rqt->c == c && rqt->rts_mop_id == mh_rts_done.mop_id &&
611                rqt->state.recv == RQ_RTS_WAITING_RTS_DONE) {
612                rq = rqt;
613                break;
614            }
615        }
616
617        bmi_ib_assert(rq, "%s: mop_id %llx in RTS_DONE message not found",
618                      __func__, llu(mh_rts_done.mop_id));
619
620#if MEMCACHE_BOUNCEBUF
621        memcpy_to_buflist(&rq->buflist, reg_recv_buflist_buf,
622                          rq->buflist.tot_len);
623#else
624        memcache_deregister(ib_device->memcache, &rq->buflist);
625#endif
626
627        post_rr(c, bh);
628
629        rq->state.recv = RQ_RTS_WAITING_USER_TEST;
630
631    } else if (type == MSG_BYE) {
632        /*
633         * Other side requests connection close.  Do it.
634         */
635        debug(2, "%s: recv BYE", __func__);
636        post_rr(c, bh);
637        ib_close_connection(c);
638
639    } else if (type == MSG_CREDIT) {
640
641        /* already added the credit in check_cq */
642        debug(2, "%s: recv CREDIT", __func__);
643        post_rr(c, bh);
644
645    } else {
646        error("%s: unknown message header type %d len %u", __func__,
647              type, byte_len);
648    }
649}
650
651/*
652 * We finished the RDMA write.  Send him a done message.
653 */
654static void encourage_rts_done_waiting_buffer(struct ib_work *sq)
655{
656    ib_connection_t *c = sq->c;
657    struct buf_head *bh;
658    char *ptr;
659    msg_header_rts_done_t mh_rts_done;
660
661    bh = get_eager_buf(c);
662    if (!bh) {
663        debug(2, "%s: sq %p no free send buffers to %s",
664              __func__, sq, c->peername);
665        return;
666    }
667    sq->bh = bh;
668    bh->sq = sq;
669    ptr = bh->buf;
670
671    msg_header_init(&mh_rts_done.c, c, MSG_RTS_DONE);
672    mh_rts_done.mop_id = sq->mop->op_id;
673
674    debug(2, "%s: sq %p sent RTS_DONE mopid %llx", __func__,
675          sq, llu(sq->mop->op_id));
676
677    encode_msg_header_rts_done_t(&ptr, &mh_rts_done);
678
679    post_sr(bh, sizeof(mh_rts_done));
680    sq->state.send = SQ_WAITING_RTS_DONE_SEND_COMPLETION;
681}
682
683static void send_bye(ib_connection_t *c)
684{
685    msg_header_common_t mh_common;
686    struct buf_head *bh;
687    char *ptr;
688
689    debug(2, "%s: sending bye", __func__);
690    bh = get_eager_buf(c);
691    if (!bh) {
692        debug(2, "%s: no free send buffers to %s", __func__, c->peername);
693        /* if no messages available, let garbage collection on server deal */
694        return;
695    }
696    bh->sq = NULL;
697    ptr = bh->buf;
698    msg_header_init(&mh_common, c, MSG_BYE);
699    encode_msg_header_common_t(&ptr, &mh_common);
700
701    post_sr(bh, sizeof(mh_common));
702}
703
704/*
705 * Two places need to send a CTS in response to an RTS.  They both
706 * call this.  This handles pinning the memory, too.  Don't forget
707 * to unpin when done.
708 */
709static int
710send_cts(struct ib_work *rq)
711{
712    ib_connection_t *c = rq->c;
713    struct buf_head *bh;
714    msg_header_cts_t mh_cts;
715    u_int64_t *bufp;
716    u_int32_t *lenp;
717    u_int32_t *keyp;
718    u_int32_t post_len;
719    char *ptr;
720    int i;
721
722    debug(2, "%s: rq %p from %s mopid %llx len %lld", __func__,
723          rq, rq->c->peername, llu(rq->rts_mop_id),
724          lld(rq->buflist.tot_len));
725
726    bh = get_eager_buf(c);
727    if (!bh) {
728        debug(2, "%s: rq %p no free send buffers to %s",
729              __func__, rq, c->peername);
730        return 1;
731    }
732    rq->bh = bh;
733    bh->sq = (struct ib_work *) rq;  /* uplink for completion */
734
735#if MEMCACHE_BOUNCEBUF
736    if (reg_recv_buflist.num == 0) {
737        reg_recv_buflist.num = 1;
738        reg_recv_buflist.buf.recv = &reg_recv_buflist_buf;
739        reg_recv_buflist.len = &reg_recv_buflist_len;
740        reg_recv_buflist.tot_len = reg_recv_buflist_len;
741        reg_recv_buflist_buf = bmi_ib_malloc(reg_recv_buflist_len);
742        memcache_register(ib_device->memcache, &reg_recv_buflist);
743    }
744    if (rq->buflist.tot_len > reg_recv_buflist_len)
745        error("%s: recv prereg buflist too small, need %lld", __func__,
746          lld(rq->buflist.tot_len));
747
748    ib_buflist_t save_buflist = rq->buflist;
749    rq->buflist = reg_recv_buflist;
750#else
751#  if !MEMCACHE_EARLY_REG
752    memcache_register(ib_device->memcache, &rq->buflist);
753#  endif
754#endif
755
756    msg_header_init(&mh_cts.c, c, MSG_CTS);
757    mh_cts.rts_mop_id = rq->rts_mop_id;
758    mh_cts.buflist_tot_len = rq->buflist.tot_len;
759    mh_cts.buflist_num = rq->buflist.num;
760
761    ptr = bh->buf;
762    encode_msg_header_cts_t(&ptr, &mh_cts);
763
764    /* encode all the buflist entries */
765    bufp = (u_int64_t *)((msg_header_cts_t *) bh->buf + 1);
766    lenp = (u_int32_t *)(bufp + rq->buflist.num);
767    keyp = (u_int32_t *)(lenp + rq->buflist.num);
768    post_len = (char *)(keyp + rq->buflist.num) - (char *)bh->buf;
769    if (post_len > ib_device->eager_buf_size)
770        error("%s: too many (%d) recv buflist entries for buf", __func__,
771          rq->buflist.num);
772    for (i=0; i<rq->buflist.num; i++) {
773        bufp[i] = htobmi64(int64_from_ptr(rq->buflist.buf.recv[i]));
774        lenp[i] = htobmi32(rq->buflist.len[i]);
775        keyp[i] = htobmi32(rq->buflist.memcache[i]->memkeys.rkey);
776    }
777
778    /* send the cts */
779    post_sr(bh, post_len);
780
781#if MEMCACHE_BOUNCEBUF
782    rq->buflist = save_buflist;
783#endif
784
785    return 0;
786}
787
788/*
789 * Bring up the connection before posting a send or receive on it.
790 */
791static int
792ensure_connected(struct bmi_method_addr *remote_map)
793{
794    int ret = 0;
795    ib_method_addr_t *ibmap = remote_map->method_data;
796
797    if (!ibmap->c && ibmap->reconnect_flag)
798        ret = ib_tcp_client_connect(ibmap, remote_map);
799    else if(!ibmap->c && !ibmap->reconnect_flag)
800        ret = 1; /* cannot actively connect */
801    else
802        ret = 0;
803
804    return ret;
805}
806
807/*
808 * Generic interface for both send and sendunexpected, list and non-list send.
809 */
810static int
811post_send(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
812          int numbufs, const void *const *buffers, const bmi_size_t *sizes,
813          bmi_size_t total_size, bmi_msg_tag_t tag, void *user_ptr,
814          bmi_context_id context_id, int is_unexpected)
815{
816    struct ib_work *sq;
817    struct method_op *mop;
818    ib_method_addr_t *ibmap;
819    int i;
820    int ret = 0;
821
822    gen_mutex_lock(&interface_mutex);
823    ret = ensure_connected(remote_map);
824    if (ret)
825        goto out;
826    ibmap = remote_map->method_data;
827
828    /* alloc and build new sendq structure */
829    sq = bmi_ib_malloc(sizeof(*sq));
830    sq->type = BMI_SEND;
831    sq->state.send = SQ_WAITING_BUFFER;
832
833    debug(2, "%s: sq %p len %lld peer %s", __func__, sq, (long long) total_size,
834          ibmap->c->peername);
835
836    /*
837     * For a single buffer, store it inside the sq directly, else save
838     * the pointer to the list the user built when calling a _list
839     * function.  This case is indicated by the non-_list functions by
840     * a zero in numbufs.
841     */
842    if (numbufs == 0) {
843        sq->buflist_one_buf.send = *buffers;
844        sq->buflist_one_len = *sizes;
845        sq->buflist.num = 1;
846        sq->buflist.buf.send = &sq->buflist_one_buf.send;
847        sq->buflist.len = &sq->buflist_one_len;
848    } else {
849        sq->buflist.num = numbufs;
850        sq->buflist.buf.send = buffers;
851        sq->buflist.len = sizes;
852    }
853    sq->buflist.tot_len = 0;
854    for (i=0; i<sq->buflist.num; i++)
855        sq->buflist.tot_len += sizes[i];
856
857    /*
858     * This passed-in total length field does not make much sense
859     * to me, but I'll at least check it for accuracy.
860     */
861    if (sq->buflist.tot_len != total_size)
862        error("%s: user-provided tot len %lld"
863          " does not match buffer list tot len %lld",
864          __func__, lld(total_size), lld(sq->buflist.tot_len));
865
866    /* unexpected messages must fit inside an eager message */
867    if (is_unexpected && sq->buflist.tot_len > ib_device->eager_buf_payload) {
868        free(sq);
869        ret = -EINVAL;
870        goto out;
871    }
872
873    sq->bmi_tag = tag;
874    sq->c = ibmap->c;
875    ++sq->c->refcnt;
876    sq->is_unexpected = is_unexpected;
877    qlist_add_tail(&sq->list, &ib_device->sendq);
878
879    /* generate identifier used by caller to test for message later */
880    mop = bmi_ib_malloc(sizeof(*mop));
881    id_gen_fast_register(&mop->op_id, mop);
882    mop->addr = remote_map;  /* set of function pointers, essentially */
883    mop->method_data = sq;
884    mop->user_ptr = user_ptr;
885    mop->context_id = context_id;
886    *id = mop->op_id;
887    sq->mop = mop;
888    debug(3, "%s: new sq %p", __func__, sq);
889
890    /* and start sending it if possible */
891    encourage_send_waiting_buffer(sq);
892  out:
893    gen_mutex_unlock(&interface_mutex);
894    return ret;
895}
896
897static int
898BMI_ib_post_send(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
899                 const void *buffer, bmi_size_t total_size,
900                 enum bmi_buffer_type buffer_flag __unused,
901                 bmi_msg_tag_t tag, void *user_ptr, bmi_context_id
902                 context_id, PVFS_hint hints __unused)
903{
904    return post_send(id, remote_map, 0, &buffer, &total_size,
905                     total_size, tag, user_ptr, context_id, 0);
906}
907
908static int
909BMI_ib_post_send_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
910  const void *const *buffers, const bmi_size_t *sizes, int list_count,
911  bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused,
912  bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id, PVFS_hint
913  hints __unused)
914{
915    return post_send(id, remote_map, list_count, buffers, sizes,
916                     total_size, tag, user_ptr, context_id, 0);
917}
918
919static int
920BMI_ib_post_sendunexpected(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
921                           const void *buffer, bmi_size_t total_size,
922                           enum bmi_buffer_type buffer_flag __unused,
923                           bmi_msg_tag_t tag, void *user_ptr,
924                           bmi_context_id context_id, PVFS_hint hints
925                           __unused)
926{
927    return post_send(id, remote_map, 0, &buffer, &total_size,
928                     total_size, tag, user_ptr, context_id, 1);
929}
930
931static int
932BMI_ib_post_sendunexpected_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
933                                const void *const *buffers,
934                                const bmi_size_t *sizes, int list_count,
935                                bmi_size_t total_size,
936                                enum bmi_buffer_type buffer_flag __unused,
937                                bmi_msg_tag_t tag, void *user_ptr,
938                                bmi_context_id context_id, PVFS_hint hints
939                                __unused)
940{
941    return post_send(id, remote_map, list_count, buffers, sizes,
942                     total_size, tag, user_ptr, context_id, 1);
943}
944
945/*
946 * Used by both recv and recv_list.
947 */
948static int
949post_recv(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
950          int numbufs, void *const *buffers, const bmi_size_t *sizes,
951          bmi_size_t tot_expected_len, bmi_msg_tag_t tag,
952          void *user_ptr, bmi_context_id context_id)
953{
954    struct ib_work *rq;
955    struct method_op *mop;
956    ib_method_addr_t *ibmap;
957    ib_connection_t *c;
958    int i;
959    int ret = 0;
960   
961    gen_mutex_lock(&interface_mutex);
962    ret = ensure_connected(remote_map);
963    if (ret)
964        goto out;
965    ibmap = remote_map->method_data;
966    c = ibmap->c;
967
968    /* poll interface first to save a few steps below */
969    ib_check_cq();
970
971    /* check to see if matching recv is in the queue */
972    rq = find_matching_recv(
973      RQ_EAGER_WAITING_USER_POST | RQ_RTS_WAITING_USER_POST, c, tag);
974    if (rq) {
975        debug(2, "%s: rq %p matches %s", __func__, rq,
976          rq_state_name(rq->state.recv));
977    } else {
978        /* alloc and build new recvq structure */
979        rq = alloc_new_recv(c, NULL);
980        rq->state.recv = RQ_WAITING_INCOMING;
981        rq->bmi_tag = tag;
982        debug(2, "%s: new rq %p", __func__, rq);
983    }
984
985    if (numbufs == 0) {
986        rq->buflist_one_buf.recv = *buffers;
987        rq->buflist_one_len = *sizes;
988        rq->buflist.num = 1;
989        rq->buflist.buf.recv = &rq->buflist_one_buf.recv;
990        rq->buflist.len = &rq->buflist_one_len;
991    } else {
992        rq->buflist.num = numbufs;
993        rq->buflist.buf.recv = buffers;
994        rq->buflist.len = sizes;
995    }
996    rq->buflist.tot_len = 0;
997    for (i=0; i<rq->buflist.num; i++)
998        rq->buflist.tot_len += sizes[i];
999
1000    /*
1001     * This passed-in total length field does not make much sense
1002     * to me, but I'll at least check it for accuracy.
1003     */
1004    if (rq->buflist.tot_len != tot_expected_len)
1005        error("%s: user-provided tot len %lld"
1006          " does not match buffer list tot len %lld",
1007          __func__, lld(tot_expected_len), lld(rq->buflist.tot_len));
1008
1009    /* generate identifier used by caller to test for message later */
1010    mop = bmi_ib_malloc(sizeof(*mop));
1011    id_gen_fast_register(&mop->op_id, mop);
1012    mop->addr = remote_map;  /* set of function pointers, essentially */
1013    mop->method_data = rq;
1014    mop->user_ptr = user_ptr;
1015    mop->context_id = context_id;
1016    *id = mop->op_id;
1017    rq->mop = mop;
1018
1019    /* handle the two "waiting for a local user post" states */
1020    if (rq->state.recv == RQ_EAGER_WAITING_USER_POST) {
1021
1022        msg_header_eager_t mh_eager;
1023        char *ptr = rq->bh->buf;
1024
1025        decode_msg_header_eager_t(&ptr, &mh_eager);
1026
1027        debug(2, "%s: rq %p state %s finish eager directly", __func__,
1028          rq, rq_state_name(rq->state.recv));
1029        if (rq->actual_len > tot_expected_len) {
1030            error("%s: received %lld matches too-small buffer %lld",
1031              __func__, lld(rq->actual_len), lld(rq->buflist.tot_len));
1032        }
1033
1034        memcpy_to_buflist(&rq->buflist,
1035                          (msg_header_eager_t *) rq->bh->buf + 1,
1036                          rq->actual_len);
1037
1038        /* re-post */
1039        post_rr(rq->c, rq->bh);
1040
1041        /* now just wait for user to test, never do "immediate completion" */
1042        rq->state.recv = RQ_EAGER_WAITING_USER_TEST;
1043        goto out;
1044
1045    } else if (rq->state.recv == RQ_RTS_WAITING_USER_POST) {
1046        int sret;
1047        debug(2, "%s: rq %p %s send cts", __func__, rq,
1048          rq_state_name(rq->state.recv));
1049        /* try to send, or wait for send buffer space */
1050        rq->state.recv = RQ_RTS_WAITING_CTS_BUFFER;
1051#if MEMCACHE_EARLY_REG
1052        memcache_register(ib_device->memcache, &rq->buflist);
1053#endif
1054        sret = send_cts(rq);
1055        if (sret == 0)
1056            rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
1057        goto out;
1058    }
1059
1060#if MEMCACHE_EARLY_REG
1061    /* but remember that this might not be used if the other side sends
1062     * less than we posted for receive; that's legal */
1063    if (rq->buflist.tot_len > ib_device->eager_buf_payload)
1064        memcache_register(ib_device->memcache, &rq->buflist);
1065#endif
1066
1067  out:
1068    gen_mutex_unlock(&interface_mutex);
1069    return ret;
1070}
1071
1072static int
1073BMI_ib_post_recv(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1074  void *buffer, bmi_size_t expected_len, bmi_size_t *actual_len __unused,
1075  enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
1076  bmi_context_id context_id, PVFS_hint hints __unused)
1077{
1078    return post_recv(id, remote_map, 0, &buffer, &expected_len,
1079                     expected_len, tag, user_ptr, context_id);
1080}
1081
1082static int
1083BMI_ib_post_recv_list(bmi_op_id_t *id, struct bmi_method_addr *remote_map,
1084  void *const *buffers, const bmi_size_t *sizes, int list_count,
1085  bmi_size_t tot_expected_len, bmi_size_t *tot_actual_len __unused,
1086  enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr,
1087  bmi_context_id context_id, PVFS_hint hints __unused)
1088{
1089    return post_recv(id, remote_map, list_count, buffers, sizes,
1090                     tot_expected_len, tag, user_ptr, context_id);
1091}
1092
1093/*
1094 * Internal shared helper function.  Return 1 if found something
1095 * completed.
1096 */
1097static int
1098test_sq(struct ib_work *sq, bmi_op_id_t *outid, bmi_error_code_t *err,
1099  bmi_size_t *size, void **user_ptr, int complete)
1100{
1101    ib_connection_t *c;
1102
1103    debug(9, "%s: sq %p outid %p err %p size %p user_ptr %p complete %d",
1104      __func__, sq, outid, err, size, user_ptr, complete);
1105
1106    if (sq->state.send == SQ_WAITING_USER_TEST) {
1107        if (complete) {
1108            debug(2, "%s: sq %p completed %lld to %s", __func__,
1109              sq, lld(sq->buflist.tot_len), sq->c->peername);
1110            *outid = sq->mop->op_id;
1111            *err = 0;
1112            *size = sq->buflist.tot_len;
1113            if (user_ptr)
1114                *user_ptr = sq->mop->user_ptr;
1115            qlist_del(&sq->list);
1116            id_gen_fast_unregister(sq->mop->op_id);
1117            c = sq->c;
1118            free(sq->mop);
1119            free(sq);
1120            --c->refcnt;
1121            if (c->closed || c->cancelled)
1122                ib_close_connection(c);
1123            return 1;
1124        }
1125    /* this state needs help, push it (ideally would be triggered
1126     * when the resource is freed... XXX */
1127    } else if (sq->state.send == SQ_WAITING_BUFFER) {
1128        debug(2, "%s: sq %p %s, encouraging", __func__, sq,
1129          sq_state_name(sq->state.send));
1130        encourage_send_waiting_buffer(sq);
1131    } else if (sq->state.send == SQ_WAITING_RTS_DONE_BUFFER) {
1132        debug(2, "%s: sq %p %s, encouraging", __func__, sq,
1133          sq_state_name(sq->state.send));
1134        encourage_rts_done_waiting_buffer(sq);
1135    } else if (sq->state.send == SQ_CANCELLED && complete) {
1136        debug(2, "%s: sq %p cancelled", __func__, sq);
1137        *outid = sq->mop->op_id;
1138        *err = -PVFS_ETIMEDOUT;
1139        if (user_ptr)
1140            *user_ptr = sq->mop->user_ptr;
1141        qlist_del(&sq->list);
1142        id_gen_fast_unregister(sq->mop->op_id);
1143        c = sq->c;
1144        free(sq->mop);
1145        free(sq);
1146        --c->refcnt;
1147        if (c->closed || c->cancelled)
1148            ib_close_connection(c);
1149        return 1;
1150    } else {
1151        debug(9, "%s: sq %p found, not done, state %s", __func__,
1152          sq, sq_state_name(sq->state.send));
1153    }
1154    return 0;
1155}
1156
1157/*
1158 * Internal shared helper function.  Return 1 if found something
1159 * completed.  Note that rq->mop can be null for unexpected
1160 * messages.
1161 */
1162static int
1163test_rq(struct ib_work *rq, bmi_op_id_t *outid, bmi_error_code_t *err,
1164  bmi_size_t *size, void **user_ptr, int complete)
1165{
1166    ib_connection_t *c;
1167
1168    debug(9, "%s: rq %p outid %p err %p size %p user_ptr %p complete %d",
1169      __func__, rq, outid, err, size, user_ptr, complete);
1170
1171    if (rq->state.recv == RQ_EAGER_WAITING_USER_TEST
1172      || rq->state.recv == RQ_RTS_WAITING_USER_TEST) {
1173        if (complete) {
1174            debug(2, "%s: rq %p completed %lld from %s", __func__,
1175              rq, lld(rq->actual_len), rq->c->peername);
1176            *err = 0;
1177            *size = rq->actual_len;
1178            if (rq->mop) {
1179                *outid = rq->mop->op_id;
1180                if (user_ptr)
1181                    *user_ptr = rq->mop->user_ptr;
1182                id_gen_fast_unregister(rq->mop->op_id);
1183                free(rq->mop);
1184            }
1185            qlist_del(&rq->list);
1186            c = rq->c;
1187            free(rq);
1188            --c->refcnt;
1189            if (c->closed || c->cancelled)
1190                ib_close_connection(c);
1191            return 1;
1192        }
1193    /* this state needs help, push it (ideally would be triggered
1194     * when the resource is freed...) XXX */
1195    } else if (rq->state.recv == RQ_RTS_WAITING_CTS_BUFFER) {
1196        int ret;
1197        debug(2, "%s: rq %p %s, encouraging", __func__, rq,
1198          rq_state_name(rq->state.recv));
1199        ret = send_cts(rq);
1200        if (ret == 0)
1201            rq->state.recv = RQ_RTS_WAITING_CTS_SEND_COMPLETION;
1202        /* else keep waiting until we can send that cts */
1203        debug(2, "%s: rq %p now %s", __func__, rq, rq_state_name(rq->state.recv));
1204    } else if (rq->state.recv == RQ_CANCELLED && complete) {
1205        debug(2, "%s: rq %p cancelled", __func__, rq);
1206        *err = -PVFS_ETIMEDOUT;
1207        if (rq->mop) {
1208            *outid = rq->mop->op_id;
1209            if (user_ptr)
1210                *user_ptr = rq->mop->user_ptr;
1211            id_gen_fast_unregister(rq->mop->op_id);
1212            free(rq->mop);
1213        }
1214        qlist_del(&rq->list);
1215        c = rq->c;
1216        free(rq);
1217        --c->refcnt;
1218        if (c->closed || c->cancelled)
1219            ib_close_connection(c);
1220        return 1;
1221    } else {
1222        debug(9, "%s: rq %p found, not done, state %s", __func__,
1223          rq, rq_state_name(rq->state.recv));
1224    }
1225    return 0;
1226}
1227
1228/*
1229 * Test one message, send or receive.  Also used to test the send side of
1230 * messages sent using sendunexpected.
1231 */
1232static int
1233BMI_ib_test(bmi_op_id_t id, int *outcount, bmi_error_code_t *err,
1234  bmi_size_t *size, void **user_ptr, int max_idle_time __unused,
1235  bmi_context_id context_id __unused)
1236{
1237    struct method_op *mop;
1238    struct ib_work *sq;
1239    int n;
1240
1241    gen_mutex_lock(&interface_mutex);
1242    ib_check_cq();
1243
1244    mop = id_gen_fast_lookup(id);
1245    sq = mop->method_data;
1246    n = 0;
1247    if (sq->type == BMI_SEND) {
1248        if (test_sq(sq, &id, err, size, user_ptr, 1))
1249            n = 1;
1250    } else {
1251        /* actually a recv */
1252        struct ib_work *rq = mop->method_data;
1253        if (test_rq(rq, &id, err, size, user_ptr, 1))
1254            n = 1;
1255    }
1256    *outcount = n;
1257    gen_mutex_unlock(&interface_mutex);
1258    return 0;
1259}
1260
1261/*
1262 * Test just the particular list of op ids, returning the list of indices
1263 * that completed.
1264 */
1265static int BMI_ib_testsome(int incount, bmi_op_id_t *ids, int *outcount,
1266  int *index_array, bmi_error_code_t *errs, bmi_size_t *sizes, void **user_ptrs,
1267  int max_idle_time __unused, bmi_context_id context_id __unused)
1268{
1269    struct method_op *mop;
1270    struct ib_work *sq;
1271    bmi_op_id_t tid;
1272    int i, n;
1273
1274    gen_mutex_lock(&interface_mutex);
1275    ib_check_cq();
1276
1277    n = 0;
1278    for (i=0; i<incount; i++) {
1279        if (!ids[i])
1280            continue;
1281        mop = id_gen_fast_lookup(ids[i]);
1282        sq = mop->method_data;
1283
1284        if (sq->type == BMI_SEND) {
1285            if (test_sq(sq, &tid, &errs[n], &sizes[n], &user_ptrs[n], 1)) {
1286                index_array[n] = i;
1287                ++n;
1288            }
1289        } else {
1290            /* actually a recv */
1291            struct ib_work *rq = mop->method_data;
1292            if (test_rq(rq, &tid, &errs[n], &sizes[n], &user_ptrs[n], 1)) {
1293                index_array[n] = i;
1294                ++n;
1295            }
1296        }
1297    }
1298
1299    gen_mutex_unlock(&interface_mutex);
1300
1301    *outcount = n;
1302    return 0;
1303}
1304
1305/*
1306 * Test for multiple completions matching a particular user context.
1307 * Return 0 if okay, >0 if want another poll soon, negative for error.
1308 */
1309static int
1310BMI_ib_testcontext(int incount, bmi_op_id_t *outids, int *outcount,
1311  bmi_error_code_t *errs, bmi_size_t *sizes, void **user_ptrs,
1312  int max_idle_time, bmi_context_id context_id)
1313{
1314    struct qlist_head *l, *lnext;
1315    int n = 0, complete, activity = 0;
1316    void **up = NULL;
1317
1318    gen_mutex_lock(&interface_mutex);
1319
1320restart:
1321    activity += ib_check_cq();
1322
1323    /*
1324     * Walk _all_ entries on sq, rq, marking them completed or
1325     * encouraging them as needed due to resource limitations.
1326     */
1327    for (l=ib_device->sendq.next; l != &ib_device->sendq; l=lnext) {
1328        struct ib_work *sq = qlist_upcast(l);
1329        lnext = l->next;
1330        /* test them all, even if can't reap them, just to encourage */
1331        complete = (sq->mop->context_id == context_id) && (n < incount);
1332        if (user_ptrs)
1333            up = &user_ptrs[n];
1334        n += test_sq(sq, &outids[n], &errs[n], &sizes[n], up, complete);
1335    }
1336
1337    for (l=ib_device->recvq.next; l != &ib_device->recvq; l=lnext) {
1338        struct ib_work *rq = qlist_upcast(l);
1339        lnext = l->next;
1340
1341        /* some receives have no mops:  unexpected */
1342        complete = rq->mop &&
1343          (rq->mop->context_id == context_id) && (n < incount);
1344        if (user_ptrs)
1345            up = &user_ptrs[n];
1346        n += test_rq(rq, &outids[n], &errs[n], &sizes[n], up, complete);
1347    }
1348
1349    /* drop lock before blocking on new connections below */
1350    gen_mutex_unlock(&interface_mutex);
1351
1352    if (activity == 0 && n == 0 && max_idle_time > 0) {
1353        /*
1354         * Block if told to from above.
1355         */
1356        debug(8, "%s: last activity too long ago, blocking", __func__);
1357        activity = ib_block_for_activity(max_idle_time);
1358        if (activity == 1) {   /* IB action, go do it immediately */
1359            gen_mutex_lock(&interface_mutex);
1360            goto restart;
1361        }
1362    }
1363
1364    *outcount = n;
1365    return activity + n;
1366}
1367
1368/*
1369 * Non-blocking test to look for any incoming unexpected messages.
1370 * This is also where we check for new connections on the TCP socket, since
1371 * those would show up as unexpected the first time anything is sent.
1372 * Return 0 for success, or -1 for failure; number of things in *outcount.
1373 * Return >0 if want another poll soon.
1374 */
1375static int
1376BMI_ib_testunexpected(int incount __unused, int *outcount,
1377  struct bmi_method_unexpected_info *ui, int max_idle_time)
1378{
1379    struct qlist_head *l;
1380    int activity = 0, n;
1381
1382    gen_mutex_lock(&interface_mutex);
1383
1384    /* Check CQ, then look for the first unexpected message.  */
1385restart:
1386    activity += ib_check_cq();
1387
1388    n = 0;
1389    qlist_for_each(l, &ib_device->recvq) {
1390        struct ib_work *rq = qlist_upcast(l);
1391        if (rq->state.recv == RQ_EAGER_WAITING_USER_TESTUNEXPECTED) {
1392            msg_header_eager_t mh_eager;
1393            char *ptr = rq->bh->buf;
1394            ib_connection_t *c = rq->c;
1395
1396            decode_msg_header_eager_t(&ptr, &mh_eager);
1397
1398            debug(2, "%s: found waiting testunexpected", __func__);
1399            ui->error_code = 0;
1400            ui->addr = c->remote_map;  /* hand back permanent method_addr */
1401            ui->buffer = bmi_ib_malloc((unsigned long) rq->actual_len);
1402            ui->size = rq->actual_len;
1403            memcpy(ui->buffer,
1404                   (msg_header_eager_t *) rq->bh->buf + 1,
1405                   (size_t) ui->size);
1406            ui->tag = rq->bmi_tag;
1407            /* re-post the buffer in which it was sitting, just unexpecteds */
1408            post_rr(c, rq->bh);
1409            n = 1;
1410            qlist_del(&rq->list);
1411            free(rq);
1412            --c->refcnt;
1413            if (c->closed || c->cancelled)
1414                ib_close_connection(c);
1415            goto out;
1416        }
1417    }
1418
1419  out:
1420    gen_mutex_unlock(&interface_mutex);
1421
1422    if (activity == 0 && n == 0 && max_idle_time > 0) {
1423        /*
1424         * Block if told to from above, also polls TCP listening socket.
1425         */
1426        debug(8, "%s: last activity too long ago, blocking", __func__);
1427        activity = ib_block_for_activity(max_idle_time);
1428        if (activity == 1) {   /* IB action, go do it immediately */
1429            gen_mutex_lock(&interface_mutex);
1430            goto restart;
1431        }
1432    }
1433
1434    *outcount = n;
1435    return activity + n;
1436}
1437
1438/*
1439 * No need to track these internally.  Just search the entire queue.
1440 */
1441static int
1442BMI_ib_open_context(bmi_context_id context_id __unused)
1443{
1444    return 0;
1445}
1446
1447static void
1448BMI_ib_close_context(bmi_context_id context_id __unused)
1449{
1450}
1451
1452/*
1453 * Asynchronous call to destroy an in-progress operation.
1454 * Can't just call test since we don't want to reap the operation,
1455 * just make sure it's done or not.
1456 */
1457static int
1458BMI_ib_cancel(bmi_op_id_t id, bmi_context_id context_id __unused)
1459{
1460    struct method_op *mop;
1461    struct ib_work *tsq;
1462    ib_connection_t *c = 0;
1463
1464    gen_mutex_lock(&interface_mutex);
1465    ib_check_cq();
1466    mop = id_gen_fast_lookup(id);
1467    tsq = mop->method_data;
1468    if (tsq->type == BMI_SEND) {
1469        /*
1470         * Cancelling completed operations is fine, they will be
1471         * tested later.  Any others trigger full shutdown of the
1472         * connection.
1473         */
1474        if (tsq->state.send != SQ_WAITING_USER_TEST)
1475            c = tsq->c;
1476    } else {
1477        /* actually a recv */
1478        struct ib_work *rq = mop->method_data;
1479        if (!(rq->state.recv == RQ_EAGER_WAITING_USER_TEST
1480           || rq->state.recv == RQ_RTS_WAITING_USER_TEST))
1481            c = rq->c;
1482    }
1483
1484    if (c && !c->cancelled) {
1485        /*
1486         * In response to a cancel, forcibly close the connection.  Don't send
1487         * a bye message first since it may be the case that the peer is dead
1488         * anyway.  Do not close the connection until all the sq/rq on it have
1489         * gone away.
1490         */
1491        struct qlist_head *l;
1492
1493        c->cancelled = 1;
1494        drain_qp(c);
1495        qlist_for_each(l, &ib_device->sendq) {
1496            struct ib_work *sq = qlist_upcast(l);
1497            if (sq->c != c) continue;
1498#if !MEMCACHE_BOUNCEBUF
1499            if (sq->state.send == SQ_WAITING_DATA_SEND_COMPLETION)
1500                memcache_deregister(ib_device->memcache, &sq->buflist);
1501#  if MEMCACHE_EARLY_REG
1502            /* pin when sending rts, so also must dereg in this state */
1503            if (sq->state.send == SQ_WAITING_RTS_SEND_COMPLETION ||
1504                sq->state.send == SQ_WAITING_RTS_SEND_COMPLETION_GOT_CTS ||
1505                sq->state.send == SQ_WAITING_CTS)
1506                memcache_deregister(ib_device->memcache, &sq->buflist);
1507#  endif
1508#endif
1509            if (sq->state.send != SQ_WAITING_USER_TEST)
1510                sq->state.send = SQ_CANCELLED;
1511        }
1512        qlist_for_each(l, &ib_device->recvq) {
1513            struct ib_work *rq = qlist_upcast(l);
1514            if (rq->c != c) continue;
1515#if !MEMCACHE_BOUNCEBUF
1516            if (rq->state.recv == RQ_RTS_WAITING_RTS_DONE)
1517                memcache_deregister(ib_device->memcache, &rq->buflist);
1518#  if MEMCACHE_EARLY_REG
1519            /* pin on post, dereg all these */
1520            if (rq->state.recv == RQ_RTS_WAITING_CTS_SEND_COMPLETION)
1521                memcache_deregister(ib_device->memcache, &rq->buflist);
1522            if (rq->state.recv == RQ_WAITING_INCOMING
1523              && rq->buflist.tot_len > ib_device->eager_buf_payload)
1524                memcache_deregister(ib_device->memcache, &rq->buflist);
1525#  endif
1526#endif
1527            if (!(rq->state.recv == RQ_EAGER_WAITING_USER_TEST
1528               || rq->state.recv == RQ_RTS_WAITING_USER_TEST))
1529                rq->state.recv = RQ_CANCELLED;
1530        }
1531    }
1532
1533    gen_mutex_unlock(&interface_mutex);
1534    return 0;
1535}
1536
1537static const char *
1538BMI_ib_rev_lookup(struct bmi_method_addr *meth)
1539{
1540    ib_method_addr_t *ibmap = meth->method_data;
1541    if (!ibmap->c)
1542        return "(unconnected)";
1543    else
1544        return ibmap->c->peername;
1545}
1546
1547/*
1548 * Build and fill an IB-specific method_addr structure.
1549 */
1550static struct bmi_method_addr *ib_alloc_method_addr(ib_connection_t *c,
1551                                                char *hostname, int port,
1552                                                int reconnect_flag)
1553{
1554    struct bmi_method_addr *map;
1555    ib_method_addr_t *ibmap;
1556
1557    map = bmi_alloc_method_addr(bmi_ib_method_id, (bmi_size_t) sizeof(*ibmap));
1558    ibmap = map->method_data;
1559    ibmap->c = c;
1560    ibmap->hostname = hostname;
1561    ibmap->port = port;
1562    ibmap->reconnect_flag = reconnect_flag;
1563    ibmap->ref_count = 1;
1564
1565    return map;
1566}
1567
1568/*
1569 * Break up a method string like:
1570 *   ib://hostname:port/filesystem
1571 * into its constituent fields, storing them in an opaque
1572 * type, which is then returned.
1573 * XXX: I'm assuming that these actually return a _const_ pointer
1574 * so that I can hand back an existing map.
1575 */
1576static struct bmi_method_addr *BMI_ib_method_addr_lookup(const char *id)
1577{
1578    char *s, *hostname, *cp, *cq;
1579    int port;
1580    struct bmi_method_addr *map = NULL;
1581
1582    /* parse hostname */
1583    s = string_key("ib", id);  /* allocs a string */
1584    if (!s)
1585        return 0;
1586    cp = strchr(s, ':');
1587    if (!cp)
1588        error("%s: no ':' found", __func__);
1589
1590    /* copy to permanent storage */
1591    hostname = bmi_ib_malloc((unsigned long) (cp - s + 1));
1592    strncpy(hostname, s, (size_t) (cp-s));
1593    hostname[cp-s] = '\0';
1594
1595    /* strip /filesystem  */
1596    ++cp;
1597    cq = strchr(cp, '/');
1598    if (cq)
1599        *cq = 0;
1600    port = strtoul(cp, &cq, 10);
1601    if (cq == cp)
1602        error("%s: invalid port number", __func__);
1603    if (*cq != '\0')
1604        error("%s: extra characters after port number", __func__);
1605    free(s);
1606
1607    /* lookup in known connections, if there are any */
1608    gen_mutex_lock(&interface_mutex);
1609    if (ib_device) {
1610        struct qlist_head *l;
1611        qlist_for_each(l, &ib_device->connection) {
1612            ib_connection_t *c = qlist_upcast(l);
1613            ib_method_addr_t *ibmap = c->remote_map->method_data;
1614            if (ibmap->port == port && !strcmp(ibmap->hostname, hostname)) {
1615               map = c->remote_map;
1616               ibmap->ref_count++;
1617               break;
1618            }
1619        }
1620    }
1621    gen_mutex_unlock(&interface_mutex);
1622
1623    if (map)
1624        free(hostname);  /* found it */
1625    else
1626    {
1627        /* set reconnect flag on this addr; we will be acting as a client
1628         * for this connection and will be responsible for making sure that
1629         * the connection is established
1630         */
1631        map = ib_alloc_method_addr(0, hostname, port, 1);  /* alloc new one */
1632        /* but don't call bmi_method_addr_reg_callback! */
1633    }
1634
1635    return map;
1636}
1637
1638static ib_connection_t *ib_new_connection(int sock, const char *peername,
1639                                          int is_server)
1640{
1641    ib_connection_t *c;
1642    int i, ret;
1643
1644    c = bmi_ib_malloc(sizeof(*c));
1645    c->peername = strdup(peername);
1646
1647    /* fill send and recv free lists and buf heads */
1648    c->eager_send_buf_contig = bmi_ib_malloc(ib_device->eager_buf_num
1649      * ib_device->eager_buf_size);
1650    c->eager_recv_buf_contig = bmi_ib_malloc(ib_device->eager_buf_num
1651      * ib_device->eager_buf_size);
1652    INIT_QLIST_HEAD(&c->eager_send_buf_free);
1653    INIT_QLIST_HEAD(&c->eager_recv_buf_free);
1654    c->eager_send_buf_head_contig = bmi_ib_malloc(ib_device->eager_buf_num
1655      * sizeof(*c->eager_send_buf_head_contig));
1656    c->eager_recv_buf_head_contig = bmi_ib_malloc(ib_device->eager_buf_num
1657      * sizeof(*c->eager_recv_buf_head_contig));
1658    for (i=0; i<ib_device->eager_buf_num; i++) {
1659        struct buf_head *ebs = &c->eager_send_buf_head_contig[i];
1660        struct buf_head *ebr = &c->eager_recv_buf_head_contig[i];
1661        INIT_QLIST_HEAD(&ebs->list);
1662        INIT_QLIST_HEAD(&ebr->list);
1663        ebs->c = c;
1664        ebr->c = c;
1665        ebs->num = i;
1666        ebr->num = i;
1667        ebs->buf = (char *) c->eager_send_buf_contig
1668                 + i * ib_device->eager_buf_size;
1669        ebr->buf = (char *) c->eager_recv_buf_contig
1670                 + i * ib_device->eager_buf_size;
1671        qlist_add_tail(&ebs->list, &c->eager_send_buf_free);
1672        qlist_add_tail(&ebr->list, &c->eager_recv_buf_free);
1673    }
1674
1675    /* put it on the list */
1676    qlist_add(&c->list, &ib_device->connection);
1677
1678    /* other vars */
1679    c->remote_map = 0;
1680    c->cancelled = 0;
1681    c->refcnt = 0;
1682    c->closed = 0;
1683
1684    /* save one credit back for emergency credit refill */
1685    c->send_credit = ib_device->eager_buf_num - 1;
1686    c->return_credit = 0;
1687
1688    ret = new_connection(c, sock, is_server);
1689    if (ret) {
1690        ib_close_connection(c);
1691        c = NULL;
1692    }
1693
1694    return c;
1695}
1696
1697/*
1698 * Try to close and free a connection, but only do it if refcnt has
1699 * gone to zero.
1700 */
1701static void ib_close_connection(ib_connection_t *c)
1702{
1703    debug(2, "%s: closing connection to %s", __func__, c->peername);
1704    c->closed = 1;
1705    if (c->refcnt != 0) {
1706        debug(1, "%s: refcnt non-zero %d, delaying free", __func__, c->refcnt);
1707        return;
1708    }
1709
1710    close_connection(c);
1711
1712    free(c->eager_send_buf_contig);
1713    free(c->eager_recv_buf_contig);
1714    free(c->eager_send_buf_head_contig);
1715    free(c->eager_recv_buf_head_contig);
1716    /* never free the remote map, for the life of the executable, just
1717     * mark it unconnected since BMI will always have this structure. */
1718    if (c->remote_map) {
1719        ib_method_addr_t *ibmap = c->remote_map->method_data;
1720        ibmap->c = NULL;
1721    }
1722    free(c->peername);
1723    qlist_del(&c->list);
1724    free(c);
1725}
1726
1727/*
1728 * Blocking connect initiated by a post_sendunexpected{,_list}, or
1729 * post_recv*
1730 */
1731static int ib_tcp_client_connect(ib_method_addr_t *ibmap,
1732                                 struct bmi_method_addr *remote_map)
1733{
1734    int s;
1735    char peername[2048];
1736    struct hostent *hp;
1737    struct sockaddr_in skin;
1738   
1739    s = socket(AF_INET, SOCK_STREAM, 0);
1740    if (s < 0) {
1741        warning("%s: create tcp socket: %m", __func__);
1742        return bmi_errno_to_pvfs(-errno);
1743    }
1744    hp = gethostbyname(ibmap->hostname);
1745    if (!hp) {
1746        warning("%s: cannot resolve server %s", __func__, ibmap->hostname);
1747        return -1;
1748    }
1749    memset(&skin, 0, sizeof(skin));
1750    skin.sin_family = hp->h_addrtype;
1751    memcpy(&skin.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
1752    skin.sin_port = htons(ibmap->port);
1753    sprintf(peername, "%s:%d", ibmap->hostname, ibmap->port);
1754  retry:
1755    if (connect(s, (struct sockaddr *) &skin, sizeof(skin)) < 0) {
1756        if (errno == EINTR)
1757            goto retry;
1758        else {
1759            warning("%s: connect to server %s: %m", __func__, peername);
1760            return bmi_errno_to_pvfs(-errno);
1761        }
1762    }
1763    ibmap->c = ib_new_connection(s, peername, 0);
1764    if (!ibmap->c)
1765        error("%s: ib_new_connection failed", __func__);
1766    ibmap->c->remote_map = remote_map;
1767
1768    if (close(s) < 0) {
1769        warning("%s: close sock: %m", __func__);
1770        return bmi_errno_to_pvfs(-errno);
1771    }
1772    return 0;
1773}
1774
1775/*
1776 * On a server, initialize a socket for listening for new connections.
1777 */
1778static void ib_tcp_server_init_listen_socket(struct bmi_method_addr *addr)
1779{
1780    int flags;
1781    struct sockaddr_in skin;
1782    ib_method_addr_t *ibc = addr->method_data;
1783
1784    ib_device->listen_sock = socket(AF_INET, SOCK_STREAM, 0);
1785    if (ib_device->listen_sock < 0)
1786        error_errno("%s: create tcp socket", __func__);
1787    flags = 1;
1788    if (setsockopt(ib_device->listen_sock, SOL_SOCKET, SO_REUSEADDR, &flags,
1789      sizeof(flags)) < 0)
1790        error_errno("%s: setsockopt REUSEADDR", __func__);
1791    memset(&skin, 0, sizeof(skin));
1792    skin.sin_family = AF_INET;
1793    skin.sin_port = htons(ibc->port);
1794  retry:
1795    if (bind(ib_device->listen_sock, (struct sockaddr *) &skin, sizeof(skin)) < 0) {
1796        if (errno == EINTR)
1797            goto retry;
1798        else
1799            error_errno("%s: bind tcp socket", __func__);
1800    }
1801    if (listen(ib_device->listen_sock, 1024) < 0)
1802        error_errno("%s: listen tcp socket", __func__);
1803    flags = fcntl(ib_device->listen_sock, F_GETFL);
1804    if (flags < 0)
1805        error_errno("%s: fcntl getfl listen sock", __func__);
1806    flags |= O_NONBLOCK;
1807    if (fcntl(ib_device->listen_sock, F_SETFL, flags) < 0)
1808        error_errno("%s: fcntl setfl nonblock listen sock", __func__);
1809}
1810
1811/*
1812 * Check for new connections.  The listening socket is left nonblocking
1813 * so this test can be quick; but accept is not really that quick compared
1814 * to polling an IB interface, for instance.  Returns >0 if an accept worked.
1815 */
1816static int ib_tcp_server_check_new_connections(void)
1817{
1818    struct sockaddr_in ssin;
1819    socklen_t len;
1820    int s, ret = 0;
1821
1822    len = sizeof(ssin);
1823    s = accept(ib_device->listen_sock, (struct sockaddr *) &ssin, &len);
1824    if (s < 0) {
1825        if (!(errno == EAGAIN))
1826            error_errno("%s: accept listen sock", __func__);
1827    } else {
1828        char peername[2048];
1829        ib_connection_t *c;
1830
1831        char *hostname = strdup(inet_ntoa(ssin.sin_addr));
1832        int port = ntohs(ssin.sin_port);
1833        sprintf(peername, "%s:%d", hostname, port);
1834
1835        gen_mutex_lock(&interface_mutex);
1836
1837        c = ib_new_connection(s, peername, 1);
1838        if (!c) {
1839            free(hostname);
1840            goto out_unlock;
1841        }
1842
1843        /* don't set reconnect flag on this addr; we are a server in this
1844         * case and the peer will be responsible for maintaining the
1845         * connection
1846         */
1847        c->remote_map = ib_alloc_method_addr(c, hostname, port, 0);
1848        /* register this address with the method control layer */
1849        c->bmi_addr = bmi_method_addr_reg_callback(c->remote_map);
1850        if (c->bmi_addr == 0)
1851            error_xerrno(ENOMEM, "%s: bmi_method_addr_reg_callback", __func__);
1852
1853        debug(2, "%s: accepted new connection %s at server", __func__,
1854          c->peername);
1855        ret = 1;
1856
1857out_unlock:
1858        gen_mutex_unlock(&interface_mutex);
1859        if (close(s) < 0)
1860            error_errno("%s: close new sock", __func__);
1861    }
1862    return ret;
1863}
1864
1865/*
1866 * Ask the device to write to its FD if a CQ event happens, and poll on it
1867 * as well as the listen_sock for activity, but do not actually respond to
1868 * anything.  A later ib_check_cq will handle CQ events, and a later call to
1869 * testunexpected will pick up new connections.  Returns ==1 if IB device is
1870 * ready, other >0 for some activity, else 0.
1871 */
1872static int ib_block_for_activity(int timeout_ms)
1873{
1874    struct pollfd pfd[3];  /* cq fd, async fd, accept socket */
1875    int numfd;
1876    int ret;
1877
1878    prepare_cq_block(&pfd[0].fd, &pfd[1].fd);
1879    pfd[0].events = POLLIN;
1880    pfd[1].events = POLLIN;
1881    numfd = 2;
1882    if (ib_device->listen_sock >= 0) {
1883        pfd[2].fd = ib_device->listen_sock;
1884        pfd[2].events = POLLIN;
1885        numfd = 3;
1886    }
1887
1888    ret = poll(pfd, numfd, timeout_ms);
1889    debug(4, "%s: ret %d rev0 %x rev1 %x", __func__, ret,
1890          pfd[0].revents, pfd[1].revents);
1891    if (ret > 0) {
1892        if (pfd[0].revents == POLLIN) {
1893            ack_cq_completion_event();
1894            return 1;
1895        }
1896        /* check others only if CQ was empty */
1897        ret = 2;
1898        if (pfd[1].revents == POLLIN)
1899            check_async_events();
1900        if (pfd[2].revents == POLLIN)
1901            ib_tcp_server_check_new_connections();
1902    } else if (ret < 0) {
1903        if (errno == EINTR)  /* normal, ignore but break */
1904            ret = 0;
1905        else
1906            error_errno("%s: poll listen sock", __func__);
1907    }
1908    return ret;
1909}
1910
1911static void *BMI_ib_memalloc(bmi_size_t len,
1912                             enum bmi_op_type send_recv __unused)
1913{
1914    return memcache_memalloc(ib_device->memcache, len,
1915                             ib_device->eager_buf_payload);
1916}
1917
1918static int BMI_ib_memfree(void *buf, bmi_size_t len,
1919                          enum bmi_op_type send_recv __unused)
1920{
1921    return memcache_memfree(ib_device->memcache, buf, len);
1922}
1923
1924static int BMI_ib_unexpected_free(void *buf)
1925{
1926    free(buf);
1927    return 0;
1928}
1929
1930/*
1931 * Callers sometimes want to know odd pieces of information.  Satisfy
1932 * them.
1933 */
1934static int BMI_ib_get_info(int option, void *param)
1935{
1936    int ret = 0;
1937
1938    switch (option) {
1939        case BMI_CHECK_MAXSIZE:
1940            /* reality is 2^31, but shrink to avoid negative int */
1941            *(int *)param = (1UL << 31) - 1;
1942            break;
1943        case BMI_GET_UNEXP_SIZE:
1944            *(int *)param = ib_device->eager_buf_payload;
1945            break;
1946        default:
1947            ret = -ENOSYS;
1948    }
1949    return ret;
1950}
1951
1952/*
1953 * Used to set some optional parameters and random functions, like ioctl.
1954 */
1955static int BMI_ib_set_info(int option, void *param __unused)
1956{
1957    switch (option) {
1958    case BMI_DROP_ADDR: {
1959        struct bmi_method_addr *map = param;
1960        ib_method_addr_t *ibmap = map->method_data;
1961        ibmap->ref_count--;
1962        if (ibmap->ref_count == 0)
1963        {
1964           free(ibmap->hostname);
1965           free(map);
1966        }
1967        break;
1968    }
1969    case BMI_OPTIMISTIC_BUFFER_REG: {
1970        /* not guaranteed to work */
1971        const struct bmi_optimistic_buffer_info *binfo = param;
1972        memcache_preregister(ib_device->memcache, binfo->buffer,
1973                             binfo->len, binfo->rw);
1974        break;
1975    }
1976    default:
1977        /* Should return -ENOSYS, but return 0 for caller ease. */
1978        break;
1979    }
1980    return 0;
1981}
1982
1983#ifdef OPENIB
1984extern int openib_ib_initialize(void);
1985#endif
1986#ifdef VAPI
1987extern int vapi_ib_initialize(void);
1988#endif
1989
1990/*
1991 * Startup, once per application.
1992 */
1993static int BMI_ib_initialize(struct bmi_method_addr *listen_addr, int method_id,
1994                             int init_flags)
1995{
1996    int ret;
1997
1998    debug(0, "%s: init", __func__);
1999
2000    gen_mutex_lock(&interface_mutex);
2001
2002    /* check params */
2003    if (!!listen_addr ^ (init_flags & BMI_INIT_SERVER))
2004        error("%s: error: BMI_INIT_SERVER requires non-null listen_addr"
2005          " and v.v", __func__);
2006
2007    bmi_ib_method_id = method_id;
2008
2009    ib_device = bmi_ib_malloc(sizeof(*ib_device));
2010
2011    /* try, in order, OpenIB then VAPI; set up function pointers */
2012    ret = 1;
2013#ifdef OPENIB
2014    ret = openib_ib_initialize();
2015#endif
2016#ifdef VAPI
2017    if (ret)
2018        ret = vapi_ib_initialize();
2019#endif
2020    if (ret)
2021        return -ENODEV;  /* neither found */
2022
2023    /* initialize memcache */
2024    ib_device->memcache = memcache_init(mem_register, mem_deregister);
2025#if 0
2026    /*
2027     * Need this for correctness.  Could use malloc/free hooks instead, but
2028     * they fight with mpich's use, for example.  Consider switching to dreg
2029     * kernel module.
2030     */
2031    mallopt(M_TRIM_THRESHOLD, -1);
2032    mallopt(M_MMAP_MAX, 0);
2033#endif
2034
2035    /*
2036     * Set up tcp socket to listen for connection requests.
2037     * The hostname is currently ignored; the port number is used to bind
2038     * the listening TCP socket which accepts new connections.
2039     */
2040    if (init_flags & BMI_INIT_SERVER) {
2041        ib_tcp_server_init_listen_socket(listen_addr);
2042        ib_device->listen_addr = listen_addr;
2043    } else {
2044        ib_device->listen_sock = -1;
2045        ib_device->listen_addr = NULL;
2046    }
2047
2048    /*
2049     * Initialize data structures.
2050     */
2051    INIT_QLIST_HEAD(&ib_device->connection);
2052    INIT_QLIST_HEAD(&ib_device->sendq);
2053    INIT_QLIST_HEAD(&ib_device->recvq);
2054
2055    ib_device->eager_buf_num  = DEFAULT_EAGER_BUF_NUM;
2056    ib_device->eager_buf_size = DEFAULT_EAGER_BUF_SIZE;
2057    ib_device->eager_buf_payload = ib_device->eager_buf_size - sizeof(msg_header_eager_t);
2058
2059    gen_mutex_unlock(&interface_mutex);
2060
2061    debug(0, "%s: done", __func__);
2062    return ret;
2063}
2064
2065/*
2066 * Shutdown.
2067 */
2068static int BMI_ib_finalize(void)
2069{
2070    gen_mutex_lock(&interface_mutex);
2071
2072    /* if client, send BYE to each connection and bring down the QP */
2073    if (ib_device->listen_sock < 0) {
2074        struct qlist_head *l;
2075        qlist_for_each(l, &ib_device->connection) {
2076            ib_connection_t *c = qlist_upcast(l);
2077            if (c->cancelled)
2078                continue;  /* already closed */
2079            /* Send BYE message to servers, transition QP to drain state */
2080            send_bye(c);
2081            drain_qp(c);
2082        }
2083    }
2084    /* if server, stop listening */
2085    if (ib_device->listen_sock >= 0) {
2086        ib_method_addr_t *ibmap = ib_device->listen_addr->method_data;
2087        close(ib_device->listen_sock);
2088        free(ibmap->hostname);
2089        free(ib_device->listen_addr);
2090    }
2091
2092    /* destroy QPs and other connection structures */
2093    while (ib_device->connection.next != &ib_device->connection) {
2094        ib_connection_t *c = (ib_connection_t *) ib_device->connection.next;
2095        ib_close_connection(c);
2096    }
2097
2098#if MEMCACHE_BOUNCEBUF
2099    if (reg_send_buflist.num > 0) {
2100        memcache_deregister(ib_device->memcache, &reg_send_buflist);
2101        reg_send_buflist.num = 0;
2102        free(reg_send_buflist_buf);
2103    }
2104    if (reg_recv_buflist.num > 0) {
2105        memcache_deregister(ib_device->memcache, &reg_recv_buflist);
2106        reg_recv_buflist.num = 0;
2107        free(reg_recv_buflist_buf);
2108    }
2109#endif
2110
2111    memcache_shutdown(ib_device->memcache);
2112
2113    ib_finalize();
2114
2115    free(ib_device);
2116    ib_device = NULL;
2117
2118    gen_mutex_unlock(&interface_mutex);
2119    return 0;
2120}
2121
2122const struct bmi_method_ops bmi_ib_ops =
2123{
2124    .method_name = "bmi_ib",
2125    .flags = 0,
2126    .initialize = BMI_ib_initialize,
2127    .finalize = BMI_ib_finalize,
2128    .set_info = BMI_ib_set_info,
2129    .get_info = BMI_ib_get_info,
2130    .memalloc = BMI_ib_memalloc,
2131    .memfree = BMI_ib_memfree,
2132    .unexpected_free = BMI_ib_unexpected_free,
2133    .post_send = BMI_ib_post_send,
2134    .post_sendunexpected = BMI_ib_post_sendunexpected,
2135    .post_recv = BMI_ib_post_recv,
2136    .test = BMI_ib_test,
2137    .testsome = BMI_ib_testsome,
2138    .testcontext = BMI_ib_testcontext,
2139    .testunexpected = BMI_ib_testunexpected,
2140    .method_addr_lookup = BMI_ib_method_addr_lookup,
2141    .post_send_list = BMI_ib_post_send_list,
2142    .post_recv_list = BMI_ib_post_recv_list,
2143    .post_sendunexpected_list = BMI_ib_post_sendunexpected_list,
2144    .open_context = BMI_ib_open_context,
2145    .close_context = BMI_ib_close_context,
2146    .cancel = BMI_ib_cancel,
2147    .rev_lookup_unexpected = BMI_ib_rev_lookup,
2148    .query_addr_range = NULL,
2149};
2150
Note: See TracBrowser for help on using the browser.