root/trunk/src/io/bmi/bmi_gm/bmi-gm.c @ 8304

Revision 8304, 106.6 KB (checked in by pcarns, 3 years ago)

merging bmi-experimental-branch to trunk

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/* GM implementation of a BMI method */
8
9#include <errno.h>
10#include <string.h>
11#include <unistd.h>
12#include <fcntl.h>
13#include <assert.h>
14#include <stdio.h>
15#include <sys/time.h>
16#include <gm.h>
17
18#include "bmi-method-support.h"
19#include "bmi-method-callback.h"
20#include "bmi-gm-addressing.h"
21#include "op-list.h"
22#include "bmi-gm-addr-list.h"
23#include "gossip.h"
24#include "quicklist.h"
25#include "bmi-gm-bufferpool.h"
26#include "pvfs2-config.h"
27#include "id-generator.h"
28#include "gen-locks.h"
29#ifdef ENABLE_GM_REGCACHE
30#include "bmi-gm-regcache.h"
31#endif
32#include "pvfs2-debug.h"
33
34static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
35static unsigned int bmi_gm_reserved_ports[BMI_GM_MAX_PORTS] =
36    {1,1,0,1,0,0,0,0};
37
38/* how long to wait on cancelled rendezvous receive operations to finish
39 * before reusing the buffer for something else
40 */
41/* 15 minutes */
42#define PINT_CANCELLED_REND_RECLAIM_TIMEOUT (60*15)
43
44/* function prototypes */
45int BMI_gm_initialize(bmi_method_addr_p listen_addr,
46                      int method_id,
47                      int init_flags);
48int BMI_gm_finalize(void);
49int BMI_gm_set_info(int option,
50                    void *inout_parameter);
51int BMI_gm_get_info(int option,
52                    void *inout_parameter);
53void *BMI_gm_memalloc(bmi_size_t size,
54                      enum bmi_op_type send_recv);
55int BMI_gm_memfree(void *buffer,
56                   bmi_size_t size,
57                   enum bmi_op_type send_recv);
58int BMI_gm_unexpected_free(void *buffer);
59int BMI_gm_post_send_list(bmi_op_id_t * id,
60    bmi_method_addr_p dest,
61    const void *const *buffer_list,
62    const bmi_size_t *size_list,
63    int list_count,
64    bmi_size_t total_size,
65    enum bmi_buffer_type buffer_type,
66    bmi_msg_tag_t tag,
67    void *user_ptr,
68    bmi_context_id context_id,
69    PVFS_hint hints);
70int BMI_gm_post_sendunexpected_list(bmi_op_id_t * id,
71    bmi_method_addr_p dest,
72    const void *const *buffer_list,
73    const bmi_size_t *size_list,
74    int list_count,
75    bmi_size_t total_size,
76    enum bmi_buffer_type buffer_type,
77    bmi_msg_tag_t tag,
78    uint8_t class,
79    void *user_ptr,
80    bmi_context_id context_id,
81    PVFS_hint hints);
82int BMI_gm_post_recv_list(bmi_op_id_t * id,
83    bmi_method_addr_p src,
84    void *const *buffer_list,
85    const bmi_size_t *size_list,
86    int list_count,
87    bmi_size_t total_expected_size,
88    bmi_size_t * total_actual_size,
89    enum bmi_buffer_type buffer_type,
90    bmi_msg_tag_t tag,
91    void *user_ptr,
92    bmi_context_id context_id,
93    PVFS_hint hints);
94int BMI_gm_test(bmi_op_id_t id,
95                int *outcount,
96                bmi_error_code_t * error_code,
97                bmi_size_t * actual_size,
98                void **user_ptr,
99                int max_idle_time_ms,
100                bmi_context_id context_id);
101int BMI_gm_testsome(int incount,
102                    bmi_op_id_t * id_array,
103                    int *outcount,
104                    int *index_array,
105                    bmi_error_code_t * error_code_array,
106                    bmi_size_t * actual_size_array,
107                    void **user_ptr_array,
108                    int max_idle_time_ms,
109                    bmi_context_id context_id);
110int BMI_gm_testcontext(int incount,
111    bmi_op_id_t * out_id_array,
112    int *outcount,
113    bmi_error_code_t * error_code_array,
114    bmi_size_t * actual_size_array,
115    void **user_ptr_array,
116    int max_idle_time_ms,
117    bmi_context_id context_id);
118int BMI_gm_testunexpected(int incount,
119                          int *outcount,
120                          struct bmi_method_unexpected_info *info,
121                          uint8_t class,
122                          int max_idle_time_ms);
123bmi_method_addr_p BMI_gm_method_addr_lookup(const char *id_string);
124int BMI_gm_open_context(bmi_context_id context_id);
125void BMI_gm_close_context(bmi_context_id context_id);
126int BMI_gm_cancel(bmi_op_id_t id, bmi_context_id context_id);
127int BMI_gm_get_unexp_maxsize(void);
128
129char BMI_gm_method_name[] = "bmi_gm";
130
131/* exported method interface */
132const struct bmi_method_ops bmi_gm_ops = {
133    .method_name = BMI_gm_method_name,
134    .flags = 0,
135    .initialize = BMI_gm_initialize,
136    .finalize = BMI_gm_finalize,
137    .set_info = BMI_gm_set_info,
138    .get_info = BMI_gm_get_info,
139    .memalloc = BMI_gm_memalloc,
140    .memfree = BMI_gm_memfree,
141    .unexpected_free = BMI_gm_unexpected_free,
142    .test = BMI_gm_test,
143    .testsome = BMI_gm_testsome,
144    .testcontext = BMI_gm_testcontext,
145    .testunexpected = BMI_gm_testunexpected,
146    .method_addr_lookup = BMI_gm_method_addr_lookup,
147    .post_send_list = BMI_gm_post_send_list,
148    .post_recv_list = BMI_gm_post_recv_list,
149    .post_sendunexpected_list = BMI_gm_post_sendunexpected_list,
150    .open_context = BMI_gm_open_context,
151    .close_context = BMI_gm_close_context,
152    .cancel = BMI_gm_cancel,
153    .rev_lookup_unexpected = NULL,
154    .query_addr_range = NULL,
155};
156
157/* module parameters */
158static struct
159{
160    int method_flags;
161    int method_id;
162    bmi_method_addr_p listen_addr;
163} gm_method_params;
164
165/* op_list_array indices */
166enum
167{
168    /* be careful changing these indexes!  op list searches rely on these
169     * numerical values.
170     */
171    NUM_INDICES = 10,
172    IND_NEED_SEND_TOK_HI_CTRLACK = 0,
173    IND_NEED_SEND_TOK_LOW = 1,
174    IND_NEED_SEND_TOK_HI_CTRL = 2,
175    IND_NEED_SEND_TOK_HI_PUT = 3,
176    IND_SENDING = 4,
177    IND_RECVING = 5,
178    IND_NEED_RECV_POST = 6,
179    IND_NEED_CTRL_MATCH = 7,
180    IND_COMPLETE_RECV_UNEXP = 8,
181    IND_CANCELLED_REND = 9,
182};
183
184static int cancelled_rend_count = 0;
185
186/* buffer status indicator */
187enum
188{
189    GM_BUF_USER_ALLOC = 1,
190    GM_BUF_METH_ALLOC = 2,
191    GM_BUF_METH_REG = 3
192};
193
194/* internal operations lists */
195static op_list_p op_list_array[NUM_INDICES] = { NULL, NULL, NULL, NULL,
196    NULL, NULL, NULL, NULL, NULL
197};
198
199static op_list_p completion_array[BMI_MAX_CONTEXTS] = {NULL};
200
201/* GM message modes */
202enum
203{
204    GM_MODE_IMMED = 1,          /* immediate */
205    GM_MODE_UNEXP = 2,          /* unexpected */
206    GM_MODE_REND = 4            /* rendezvous */
207};
208
209/* control message types */
210enum
211{
212    CTRL_REQ_TYPE = 1,
213    CTRL_ACK_TYPE = 2,
214    CTRL_IMMED_TYPE = 3,
215    CTRL_PUT_TYPE = 4,
216    CTRL_UNEXP_TYPE = 5
217};
218
219/* control messages */
220struct ctrl_req
221{
222    bmi_size_t actual_size;     /* size of message we want to send */
223    bmi_msg_tag_t msg_tag;      /* message tag */
224    bmi_op_id_t sender_op_id;   /* used for matching ctrl's */
225    /* will probably be padded later for immediate messages */
226};
227struct ctrl_ack
228{
229    PVFS_id_gen_t sender_op_id; /* used for matching ctrl's */
230    bmi_op_id_t receiver_op_id; /* used for matching ctrl's */
231    gm_remote_ptr_t remote_ptr; /* peer address to target */
232};
233struct ctrl_immed
234{
235    bmi_msg_tag_t msg_tag;      /* message tag */
236    int32_t actual_size;
237    uint8_t class;
238};
239struct ctrl_put
240{
241    bmi_op_id_t receiver_op_id; /* remote op that this completes */
242};
243
244struct ctrl_msg
245{
246    uint32_t ctrl_type;
247    uint32_t magic_nr;
248    union
249    {
250        struct ctrl_req req;
251        struct ctrl_ack ack;
252        struct ctrl_immed immed;
253        struct ctrl_put put;
254    }
255    u;
256};
257
258/* tunable parameters */
259enum
260{
261    GM_WAIT_METRIC = 50000,     /* maximum usecs we will wait during tests */
262    /* max number of network events we will process per function call. */
263    GM_MAX_EVENT_CYCLES = 1
264};
265
266/* Allowable sizes for each mode */
267enum
268{
269    GM_IMMED_SIZE = 14,
270    GM_CTRL_LENGTH = sizeof(struct ctrl_msg),
271    GM_MODE_IMMED_LIMIT = 0,
272    GM_MODE_REND_LIMIT = 524288,        /* 512K */
273    GM_MODE_UNEXP_LIMIT = 16384 /* 16K */
274};
275static gm_size_t GM_IMMED_LENGTH;
276
277/* gm particular method op data */
278struct gm_op
279{
280    /* used by callback functions to discard ctrl message buffers */
281    struct ctrl_msg *freeable_ctrl_buffer;
282    bmi_op_id_t peer_op_id;     /* op id of partner's operation */
283    /* indicates how buffer was allocated or pinned */
284    int buffer_status;
285    gm_remote_ptr_t remote_ptr;
286    void *tmp_xfer_buffer;
287    uint8_t complete; /* indicates when operation is completed */
288    uint8_t list_flag; /* indicates this is a list operation */
289    uint8_t cancelled; /* indicates operation has been cancelled */
290    long cancelled_tv_sec; /* timestamp for when the op was cancelled */
291};
292
293/* the local port that we are communicating on */
294static struct gm_port *local_port = NULL;
295/* and what we will call it */
296static char BMI_GM_PORT_NAME[] = "pvfs_gm";
297
298/* keep up with recv tokens */
299static int recv_token_count_high = 0;
300
301static int global_timeout_flag = 0;
302
303static char gm_host_name[GM_MAX_HOST_NAME_LEN];
304
305/* internal gm method address list */
306QLIST_HEAD(gm_addr_list);
307
308/* static buffer pools */
309static struct bufferpool *ctrl_send_pool = NULL;
310#ifdef ENABLE_GM_BUFPOOL
311static struct bufferpool *io_pool = NULL;
312#endif /* ENABLE_GM_BUFPOOL */
313
314/* internal utility functions */
315static bmi_method_addr_p alloc_gm_method_addr(void);
316static void dealloc_gm_method_addr(bmi_method_addr_p map);
317static int gm_post_send_check_resource(struct method_op* mop);
318static int gm_post_send_build_op(bmi_op_id_t * id,
319    bmi_method_addr_p dest,
320    const void *buffer,
321    bmi_size_t size,
322    bmi_msg_tag_t tag,
323    int mode,
324    int buffer_status,
325    void *user_ptr, bmi_context_id context_id);
326static int gm_post_send_build_op_list(bmi_op_id_t * id,
327    bmi_method_addr_p dest,
328    const void *const *buffer_list,
329    const bmi_size_t *size_list,
330    int list_count,
331    bmi_size_t total_size,
332    bmi_msg_tag_t tag,
333    int mode,
334    int buffer_status,
335    void *user_ptr, bmi_context_id context_id);
336static void ctrl_req_callback(struct gm_port *port,
337                              void *context,
338                              gm_status_t status);
339void dealloc_gm_method_op(method_op_p op_p);
340static method_op_p alloc_gm_method_op(void);
341static int ctrl_recv_pool_init(int pool_size);
342static int gm_do_work(int wait_time);
343static void delayed_token_sweep(void);
344static int receive_cycle(int timeout);
345void alarm_callback(void *context);
346static int recv_event_handler(gm_recv_event_t * poll_event,
347                             int fast);
348static void ctrl_ack_handler(bmi_op_id_t ctrl_op_id,
349                             unsigned int node_id,
350                             gm_remote_ptr_t remote_ptr,
351                             bmi_op_id_t peer_op_id);
352static int ctrl_req_handler_rend(bmi_op_id_t ctrl_op_id,
353                                 bmi_size_t ctrl_actual_size,
354                                 bmi_msg_tag_t ctrl_tag,
355                                 unsigned int node_id,
356                                 unsigned int port_id);
357static int immed_unexp_recv_handler(bmi_size_t size,
358                                    bmi_msg_tag_t msg_tag,
359                                    bmi_method_addr_p map,
360                                    void *buffer,
361                                    uint8_t class);
362static int immed_recv_handler(bmi_size_t actual_size,
363                              bmi_msg_tag_t msg_tag,
364                              bmi_method_addr_p map,
365                              void *buffer);
366static void put_recv_handler(bmi_op_id_t ctrl_op_id);
367static void ctrl_ack_callback(struct gm_port *port,
368                              void *context,
369                              gm_status_t status);
370static void data_send_callback(struct gm_port *port,
371                               void *context,
372                               gm_status_t status);
373static void immed_send_callback(struct gm_port *port,
374                                void *context,
375                                gm_status_t status);
376static void ctrl_put_callback(struct gm_port *port,
377                              void *context,
378                              gm_status_t status);
379static void initiate_send_rend(method_op_p mop);
380static void initiate_send_immed(method_op_p mop);
381static void initiate_put_announcement(method_op_p mop);
382static void send_data_buffer(method_op_p mop);
383static void prepare_for_recv(method_op_p mop);
384static void setup_send_data_buffer(method_op_p mop);
385static void setup_send_data_buffer_list(method_op_p mop);
386static void prepare_for_recv_list(method_op_p mop);
387static void reclaim_cancelled_io_buffers(void);
388static int io_buffers_exhausted(void);
389
390/*************************************************************************
391 * Visible Interface
392 */
393
394/* BMI_gm_initialize()
395 *
396 * Initializes the gm method.  Must be called before any other gm
397 * method functions.
398 *
399 * returns 0 on success, -errno on failure
400 */
401int BMI_gm_initialize(bmi_method_addr_p listen_addr,
402                      int method_id,
403                      int init_flags)
404{
405    gm_status_t gm_ret;
406    unsigned int rec_tokens = 0;
407    unsigned int send_tokens = 0;
408    int ret = -1;
409    unsigned int gm_host_id = 0;
410    unsigned int min_message_size = 0;
411    int i = 0;
412    int tmp_errno = 0;
413    struct gm_addr *gm_addr_data = NULL;
414
415    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Initializing GM module.\n");
416
417    /* check args */
418    if ((init_flags & BMI_INIT_SERVER) && !listen_addr)
419    {
420        gossip_lerr("Error: bad parameters to GM module.\n");
421        return (bmi_gm_errno_to_pvfs(-EINVAL));
422    }
423
424    gen_mutex_lock(&interface_mutex);
425
426    GM_IMMED_LENGTH = gm_max_length_for_size(GM_IMMED_SIZE) -
427        sizeof(struct ctrl_msg);
428
429    /* zero out our parameter structure and fill it in */
430    memset(&gm_method_params, 0, sizeof(gm_method_params));
431    gm_method_params.method_id = method_id;
432    gm_method_params.method_flags = init_flags;
433
434    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Setting up GM operation lists.\n");
435    /* set up the operation lists */
436    for (i = 0; i < NUM_INDICES; i++)
437    {
438        op_list_array[i] = op_list_new();
439        if (!op_list_array[i])
440        {
441            tmp_errno = bmi_gm_errno_to_pvfs(-ENOMEM);
442            goto gm_initialize_failure;
443        }
444    }
445
446    /* start up gm */
447    gm_ret = gm_init();
448    if (gm_ret != GM_SUCCESS)
449    {
450        gossip_lerr("Error: gm_init() failure.\n");
451        gen_mutex_unlock(&interface_mutex);
452        return (bmi_gm_errno_to_pvfs(-EPROTO));
453    }
454
455    if(init_flags & BMI_INIT_SERVER)
456    {
457        /* hang on to our local listening address if needed */
458        gm_method_params.listen_addr = listen_addr;
459
460        /* open our local port for communication */
461        gm_addr_data = listen_addr->method_data;
462        gm_ret = gm_open(&local_port, BMI_GM_UNIT_NUM,
463            gm_addr_data->port_id, BMI_GM_PORT_NAME, GM_API_VERSION_1_3);
464        if (gm_ret != GM_SUCCESS)
465        {
466            ret = bmi_gm_errno_to_pvfs(-EPROTO);
467            goto gm_initialize_failure;
468        }
469    }
470    else
471    {
472        /* we must cycle through and find an open port */
473        for(i=0; i<BMI_GM_MAX_PORTS; i++)
474        {
475            if(!bmi_gm_reserved_ports[i])
476            {
477                gm_ret = gm_open(&local_port, BMI_GM_UNIT_NUM,
478                    (unsigned int)i, BMI_GM_PORT_NAME, GM_API_VERSION_1_3);
479                if(gm_ret == GM_SUCCESS)
480                    break;
481            }
482        }
483        if(i >= BMI_GM_MAX_PORTS)
484        {
485            gossip_lerr("Error: failed to find available GM port.\n");
486            ret = bmi_gm_errno_to_pvfs(-EPROTO);
487            goto gm_initialize_failure;
488        }
489        gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Using port number %i.\n", i);
490    }
491
492    rec_tokens = gm_num_receive_tokens(local_port);
493    send_tokens = gm_num_send_tokens(local_port);
494    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Available recieve tokens: %u.\n", rec_tokens);
495    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Available send tokens: %u.\n", send_tokens);
496
497    /* we will use half of the send tokens for low priority, and half for
498     * high priority */
499    gm_free_send_tokens(local_port, GM_LOW_PRIORITY, send_tokens / 2);
500    gm_free_send_tokens(local_port, GM_HIGH_PRIORITY, send_tokens / 2);
501    /* ditto for recv tokens */
502    recv_token_count_high = rec_tokens / 2;
503
504    /* go ahead and post buffers for receiving ctrl messages */
505    ret = ctrl_recv_pool_init(recv_token_count_high);
506    if (ret < 0)
507    {
508        tmp_errno = bmi_gm_errno_to_pvfs(ret);
509        goto gm_initialize_failure;
510    }
511
512#ifdef ENABLE_GM_REGCACHE
513    /* initialize the memory registration cache */
514    ret = bmi_gm_regcache_init(local_port);
515    if (ret < 0)
516    {
517        tmp_errno = bmi_gm_errno_to_pvfs(ret);
518        goto gm_initialize_failure;
519    }
520#endif /* ENABLE_GM_REGCACHE */
521
522    /* intialize the control buffer cache */
523    ctrl_send_pool = bmi_gm_bufferpool_init(local_port, send_tokens,
524                                            GM_CTRL_LENGTH);
525    if (!ctrl_send_pool)
526    {
527        tmp_errno = bmi_gm_errno_to_pvfs(ret);
528        goto gm_initialize_failure;
529    }
530
531#ifdef ENABLE_GM_BUFPOOL
532    /* initialize the io buffer cache */
533    io_pool = bmi_gm_bufferpool_init(local_port, 32, GM_MODE_REND_LIMIT);
534    if (!io_pool)
535    {
536        tmp_errno = bmi_gm_errno_to_pvfs(ret);
537        gossip_lerr("Error: failed to obtain memory for buffer pool.\n");
538        goto gm_initialize_failure;
539    }
540#endif /* ENABLE_GM_BUFPOOL */
541
542    /* allow directed sends */
543    gm_allow_remote_memory_access(local_port);
544
545    /* print out a little debugging info about the interface. */
546    gm_get_host_name(local_port, gm_host_name);
547    gm_get_node_id(local_port, &gm_host_id);
548    min_message_size = gm_min_message_size(local_port);
549    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "GM Interface host name: %s.\n", gm_host_name);
550    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "GM Interface node id: %u.\n", gm_host_id);
551    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "GM Interface min msg size: %u.\n",
552                  min_message_size);
553    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "GM immediate mode limit: %d.\n",
554                  GM_MODE_IMMED_LIMIT);
555
556    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "GM module successfully initialized.\n");
557    gen_mutex_unlock(&interface_mutex);
558    return (0);
559
560  gm_initialize_failure:
561
562    /* cleanup data structures and bail out */
563    for (i = 0; i < NUM_INDICES; i++)
564    {
565        if (op_list_array[i])
566        {
567            op_list_cleanup(op_list_array[i]);
568        }
569    }
570
571#ifdef ENABLE_GM_REGCACHE
572    /* shut down regcache */
573    bmi_gm_regcache_finalize();
574#endif /* ENABLE_GM_REGCACHE */
575
576    /* shut down ctrl buffer cache */
577    if (ctrl_send_pool)
578        bmi_gm_bufferpool_finalize(ctrl_send_pool);
579
580#ifdef ENABLE_GM_BUFPOOL
581    if (io_pool)
582        bmi_gm_bufferpool_finalize(io_pool);
583#endif /* ENABLE_GM_BUFPOOL */
584
585    gm_finalize();
586    gossip_lerr("Error: BMI_gm_initialize failure.\n");
587    gen_mutex_unlock(&interface_mutex);
588    return (bmi_gm_errno_to_pvfs(ret));
589}
590
591
592/* BMI_gm_finalize()
593 *
594 * Shuts down the gm method.
595 *
596 * returns 0 on success, -errno on failure
597 */
598int BMI_gm_finalize(void)
599{
600    int i = 0;
601
602    gen_mutex_lock(&interface_mutex);
603    /* note that this forcefully shuts down operations */
604    for (i = 0; i < NUM_INDICES; i++)
605    {
606        if (op_list_array[i])
607        {
608            op_list_cleanup(op_list_array[i]);
609            op_list_array[i] = NULL;
610        }
611    }
612
613#ifdef ENABLE_GM_REGCACHE
614    bmi_gm_regcache_finalize();
615#endif /* ENABLE_GM_REGCACHE */
616
617    bmi_gm_bufferpool_finalize(ctrl_send_pool);
618
619#ifdef ENABLE_GM_BUFPOOL
620    bmi_gm_bufferpool_finalize(io_pool);
621#endif /* ENABLE_GM_BUFPOOL */
622
623    gm_close(local_port);
624    /* shut down the gm system */
625    gm_finalize();
626    gen_mutex_unlock(&interface_mutex);
627    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "GM module finalized.\n");
628    return (0);
629}
630
631
632/*
633 * BMI_gm_method_addr_lookup()
634 *
635 * resolves the string representation of an address into a method
636 * address structure. 
637 *
638 * returns a pointer to method_addr on success, NULL on failure
639 */
640bmi_method_addr_p BMI_gm_method_addr_lookup(const char *id_string)
641{
642    char *gm_string = NULL;
643    bmi_method_addr_p new_addr = NULL;
644    struct gm_addr *gm_data = NULL;
645    char local_tag[] = "NULL";
646    char* delim = NULL;
647    int ret = -1;
648
649    gm_string = string_key("gm", id_string);
650    if (!gm_string)
651    {
652        /* the string doesn't even have our info */
653        gossip_lerr("Error: NULL id_string.\n");
654        return (NULL);
655    }
656
657    /* start breaking up the method information */
658    /* for gm, it is hostname:portnumber */
659    if((delim = index(gm_string, ':')) == NULL)
660    {
661        gossip_lerr("Error: malformed gm address.\n");
662        free(gm_string);
663        return(NULL);
664    }
665
666    /* looks ok, so let's build the method addr structure */
667    new_addr = alloc_gm_method_addr();
668    if (!new_addr)
669    {
670        gossip_lerr("Error: unable to allocate GM method address.\n");
671        free(gm_string);
672        return (NULL);
673    }
674    gm_data = new_addr->method_data;
675
676    /* pull off the port first */
677    ret = sscanf((delim+1), "%u", &(gm_data->port_id));
678    if(ret != 1)
679    {
680        gossip_lerr("Error: malformed gm address.\n");
681        dealloc_gm_method_addr(new_addr);
682        free(gm_string);
683        return(NULL);
684    }
685
686    /* now chop off the port information and parse the rest */
687    *delim = '\0';
688
689    gen_mutex_lock(&interface_mutex);
690    if (strncmp(gm_string, local_tag, strlen(local_tag)) == 0)
691    {
692        /* is a local address */
693        ;
694    }
695    else if(local_port != NULL)
696    {
697        gm_data->node_id = gm_host_name_to_node_id(local_port, gm_string);
698        if (gm_data->node_id == GM_NO_SUCH_NODE_ID)
699        {
700            gossip_lerr("Error: gm_host_name_to_node_id() failure for: %s.\n",
701                gm_string);
702            bmi_dealloc_method_addr(new_addr);
703            free(gm_string);
704            gen_mutex_unlock(&interface_mutex);
705            return (NULL);
706        }
707    }
708
709    free(gm_string);
710    /* keep up with the address here */
711    gm_addr_add(&gm_addr_list, new_addr);
712    gen_mutex_unlock(&interface_mutex);
713    return (new_addr);
714}
715
716
717/* BMI_gm_memalloc()
718 *
719 * Allocates memory that can be used in native mode by gm.
720 *
721 * returns 0 on success, -errno on failure
722 */
723void *BMI_gm_memalloc(bmi_size_t size,
724                      enum bmi_op_type send_recv)
725{
726    /* NOTE: In the send case, we allocate a little bit of extra memory
727     * at the END of the buffer to use for message trailers.  We stick it
728     * at the back in order to easily preserve alignment
729     * NOTE: We are being pretty trustful of the caller.  Checks are done
730     * (hopefully :) at the BMI level before we get here.
731     */
732    void *new_buffer = NULL;
733
734    gen_mutex_lock(&interface_mutex);
735    if (send_recv == BMI_RECV)
736    {
737        if (size <= GM_IMMED_LENGTH)
738        {
739            new_buffer = malloc(size);
740        }
741        else
742        {
743            new_buffer = (void *) gm_dma_malloc(local_port, (unsigned
744                                                             long) size);
745        }
746    }
747    else if (send_recv == BMI_SEND)
748    {
749        if (size <= GM_IMMED_LENGTH)
750        {
751            /* pad enough room for a ctrl structure */
752            size += sizeof(struct ctrl_msg);
753            new_buffer = (void *) gm_dma_malloc(local_port, (unsigned
754                                                             long) size);
755        }
756        else
757        {
758            new_buffer = (void *) gm_dma_malloc(local_port, (unsigned
759                                                             long) size);
760        }
761    }
762    else
763    {
764        new_buffer = NULL;
765    }
766
767    gen_mutex_unlock(&interface_mutex);
768    return (new_buffer);
769}
770
771
772/* BMI_gm_memfree()
773 *
774 * Frees memory that was allocated with BMI_gm_memalloc()
775 *
776 * returns 0 on success, -errno on failure
777 */
778int BMI_gm_memfree(void *buffer,
779                   bmi_size_t size,
780                   enum bmi_op_type send_recv)
781{
782    gen_mutex_lock(&interface_mutex);
783    if (send_recv == BMI_RECV)
784    {
785        if (size <= GM_IMMED_LENGTH)
786        {
787            free(buffer);
788        }
789        else
790        {
791            gm_dma_free(local_port, buffer);
792        }
793    }
794    else if (send_recv == BMI_SEND)
795    {
796        gm_dma_free(local_port, buffer);
797    }
798    else
799    {
800        gen_mutex_unlock(&interface_mutex);
801        return (bmi_gm_errno_to_pvfs(-EINVAL));
802    }
803
804    buffer = NULL;
805    gen_mutex_unlock(&interface_mutex);
806    return (0);
807}
808
809int BMI_gm_unexpected_free(void *buffer)
810{
811    if (buffer)
812    {
813        free(buffer);
814    }
815    return 0;
816}
817
818/* BMI_gm_set_info()
819 *
820 * Pass in optional parameters.
821 *
822 * returns 0 on success, -errno on failure
823 */
824int BMI_gm_set_info(int option,
825                    void *inout_parameter)
826{
827    switch(option)
828    {
829        case BMI_TCP_BUFFER_SEND_SIZE:
830        case BMI_TCP_BUFFER_RECEIVE_SIZE:
831        case BMI_FORCEFUL_CANCEL_MODE:
832        case BMI_DROP_ADDR:
833#ifdef USE_TRUSTED
834        case BMI_TRUSTED_CONNECTION:
835#endif
836            /* these tcp-specific hints mean nothing to GM */
837            return 0;
838        default:
839            return (bmi_gm_errno_to_pvfs(-ENOSYS));
840    }
841}
842
843/* BMI_gm_get_info()
844 *
845 * Query for optional parameters.
846 *
847 * returns 0 on success, -errno on failure
848 */
849int BMI_gm_get_info(int option,
850                    void *inout_parameter)
851{
852    int ret = -1;
853
854    switch (option)
855    {
856    case BMI_CHECK_MAXSIZE:
857        *((int *) inout_parameter) = GM_MODE_REND_LIMIT;
858        ret = 0;
859        break;
860    case BMI_GET_UNEXP_SIZE:
861        *((int *) inout_parameter) = GM_MODE_UNEXP_LIMIT;
862        ret = 0;
863        break;
864    default:
865        gossip_ldebug(GOSSIP_BMI_DEBUG_GM,
866            "BMI GM hint %d not implemented.\n", option);
867        ret = 0;
868    break;
869    }
870
871    return (ret);
872}
873
874/* BMI_gm_post_send_list()
875 *
876 * same as post_send, except that it sends from an array of
877 * possibly non contiguous buffers
878 *
879 * returns 0 on success, 1 on immediate successful completion,
880 * -errno on failure
881 */
882int BMI_gm_post_send_list(bmi_op_id_t * id,
883    bmi_method_addr_p dest,
884    const void *const *buffer_list,
885    const bmi_size_t *size_list,
886    int list_count,
887    bmi_size_t total_size,
888    enum bmi_buffer_type buffer_type,
889    bmi_msg_tag_t tag,
890    void *user_ptr,
891    bmi_context_id context_id,
892    PVFS_hint hints)
893{
894    int buffer_status = GM_BUF_USER_ALLOC;
895    void *new_buffer = NULL;
896    void *copy_buffer = NULL;
897    struct ctrl_msg *new_ctrl_msg = NULL;
898    bmi_size_t buffer_size = 0;
899    int i;
900    int ret;
901
902    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "BMI_gm_post_send_list called.\n");
903
904    /* TODO: think about this some.  For now this is going to be
905     * lame because we aren't going to take advantage of
906     * having buffers pinned in advance...
907     */
908    buffer_status = GM_BUF_METH_REG;
909
910    /* clear id immediately for safety */
911    *id = 0;
912
913    /* make sure it's not too big */
914    if (total_size > GM_MODE_REND_LIMIT)
915    {
916        return (bmi_gm_errno_to_pvfs(-EMSGSIZE));
917    }
918
919    gen_mutex_lock(&interface_mutex);
920    if (total_size <= GM_IMMED_LENGTH)
921    {
922        /* pad enough room for a ctrl structure */
923        buffer_size = sizeof(struct ctrl_msg) + total_size;
924
925        /* create a new buffer and copy */
926        new_buffer = (void *) gm_dma_malloc(local_port, (unsigned
927                                                         long) buffer_size);
928        if (!new_buffer)
929        {
930            gossip_lerr("Error: gm_dma_malloc failure.\n");
931            gen_mutex_unlock(&interface_mutex);
932            return (bmi_gm_errno_to_pvfs(-ENOMEM));
933        }
934
935        copy_buffer = new_buffer;
936        for(i=0; i<list_count; i++)
937        {
938            memcpy(copy_buffer, buffer_list[i], size_list[i]);
939            copy_buffer = (void*)((long)copy_buffer + (long)size_list[i]);
940        }
941
942        /* Immediate mode stuff */
943        new_ctrl_msg = (struct ctrl_msg *) (new_buffer + total_size);
944        new_ctrl_msg->ctrl_type = CTRL_IMMED_TYPE;
945        new_ctrl_msg->magic_nr = BMI_MAGIC_NR;
946        new_ctrl_msg->u.immed.actual_size = total_size;
947        new_ctrl_msg->u.immed.msg_tag = tag;
948        buffer_status = GM_BUF_METH_ALLOC;
949        ret = gm_post_send_build_op(id, dest, new_buffer, total_size,
950                                            tag,
951                                            GM_MODE_IMMED,
952                                            buffer_status, user_ptr, context_id);
953        gen_mutex_unlock(&interface_mutex);
954        return(ret);
955    }
956    else
957    {
958        ret = gm_post_send_build_op_list(id, dest, buffer_list,
959            size_list, list_count, total_size, tag, GM_MODE_REND,
960            buffer_status, user_ptr, context_id);
961        gen_mutex_unlock(&interface_mutex);
962        return(ret);
963    }
964
965}
966
967
968/* BMI_gm_post_sendunexpected_list()
969 *
970 * same as post_sendunexpected, except that it sends from an array of
971 * possibly non contiguous buffers
972 *
973 * returns 0 on success, 1 on immediate successful completion,
974 * -errno on failure
975 */
976int BMI_gm_post_sendunexpected_list(bmi_op_id_t * id,
977    bmi_method_addr_p dest,
978    const void *const *buffer_list,
979    const bmi_size_t *size_list,
980    int list_count,
981    bmi_size_t total_size,
982    enum bmi_buffer_type buffer_type,
983    bmi_msg_tag_t tag,
984    uint8_t class,
985    void *user_ptr,
986    bmi_context_id context_id,
987    PVFS_hint hints)
988{
989    int buffer_status = GM_BUF_USER_ALLOC;
990    void *new_buffer = NULL;
991    void *copy_buffer = NULL;
992    struct ctrl_msg *new_ctrl_msg = NULL;
993    bmi_size_t buffer_size = 0;
994    int i;
995    int ret;
996
997    gossip_ldebug(GOSSIP_BMI_DEBUG_GM,
998        "BMI_gm_post_sendunexpected_list called.\n");
999
1000    /* TODO: think about this some.  For now this is going to be
1001     * lame because we aren't going to take advantage of
1002     * having buffers pinned in advance...
1003     */
1004    buffer_status = GM_BUF_METH_ALLOC;
1005
1006    /* clear id immediately for safety */
1007    *id = 0;
1008
1009    /* make sure it's not too big */
1010    if (total_size > GM_MODE_UNEXP_LIMIT)
1011    {
1012        return (bmi_gm_errno_to_pvfs(-EMSGSIZE));
1013    }
1014
1015    /* pad enough room for a ctrl structure */
1016    buffer_size = sizeof(struct ctrl_msg) + total_size;
1017
1018    gen_mutex_lock(&interface_mutex);
1019
1020    /* create a new buffer and copy */
1021    new_buffer = (void *) gm_dma_malloc(local_port, (unsigned
1022                                                     long) buffer_size);
1023    if (!new_buffer)
1024    {
1025        gen_mutex_unlock(&interface_mutex);
1026        gossip_lerr("Error: gm_dma_malloc failure.\n");
1027        return (bmi_gm_errno_to_pvfs(-ENOMEM));
1028    }
1029
1030    copy_buffer = new_buffer;
1031    for(i=0; i<list_count; i++)
1032    {
1033        memcpy(copy_buffer, buffer_list[i], size_list[i]);
1034        copy_buffer = (void*)((long)copy_buffer + (long)size_list[i]);
1035    }
1036
1037    /* Immediate mode stuff */
1038    new_ctrl_msg = (struct ctrl_msg *) (new_buffer + total_size);
1039    new_ctrl_msg->ctrl_type = CTRL_UNEXP_TYPE;
1040    new_ctrl_msg->magic_nr = BMI_MAGIC_NR;
1041    new_ctrl_msg->u.immed.actual_size = total_size;
1042    new_ctrl_msg->u.immed.msg_tag = tag;
1043    new_ctrl_msg->u.immed.class = class;
1044    ret = gm_post_send_build_op(id, dest, new_buffer, total_size,
1045                                        tag,
1046                                        GM_MODE_UNEXP,
1047                                        buffer_status, user_ptr, context_id);
1048    gen_mutex_unlock(&interface_mutex);
1049    return(ret);
1050}
1051
1052
1053/* BMI_gm_post_recv_list()
1054 *
1055 * same as post_recv, except it operates on an array of possibly
1056 * discontiguous memory regions
1057 *
1058 * returns 0 on success, -errno on failure
1059 */
1060int BMI_gm_post_recv_list(bmi_op_id_t * id,
1061    bmi_method_addr_p src,
1062    void *const *buffer_list,
1063    const bmi_size_t *size_list,
1064    int list_count,
1065    bmi_size_t total_expected_size,
1066    bmi_size_t * total_actual_size,
1067    enum bmi_buffer_type buffer_type,
1068    bmi_msg_tag_t tag,
1069    void *user_ptr,
1070    bmi_context_id context_id,
1071    PVFS_hint hints)
1072{
1073    method_op_p query_op = NULL;
1074    method_op_p new_method_op = NULL;
1075    struct op_list_search_key key;
1076    struct gm_op *gm_op_data = NULL;
1077    struct gm_addr *gm_addr_data = NULL;
1078    int ret = -1;
1079    int buffer_status = GM_BUF_USER_ALLOC;
1080    int i;
1081    void* copy_buffer;
1082    bmi_size_t copy_size, total_copied;
1083
1084    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "BMI_gm_post_recv_list called.\n");
1085
1086    /* what happens here ?
1087     * see if the operation is already in progress (IND_NEED_RECV_POST)
1088     *  - if so, match it and poke it to continue
1089     *  - if not, create an op and queue it up in IND_NEED_CTRL_MATCH
1090     */
1091
1092    /* clear id immediately for safety */
1093    *id = 0;
1094
1095    /* make sure it's not too big */
1096    if (total_expected_size > GM_MODE_REND_LIMIT)
1097    {
1098        return (bmi_gm_errno_to_pvfs(-EMSGSIZE));
1099    }
1100
1101    /* TODO: this is going to be lame for a while; if we are in
1102     * rendezvous mode on list operations we will buffer copy
1103     * regardless of how the caller prepared the buffer
1104     */
1105    /* set flag to indicate if we need to pinn this buffer internally */
1106    if(total_expected_size > GM_IMMED_LENGTH)
1107        buffer_status = GM_BUF_METH_REG;
1108
1109    gen_mutex_lock(&interface_mutex);
1110    /* push work first; use this as an opportunity to make sure that the
1111     * receive keeps buffers moving as quickly as possible
1112     */
1113    ret = gm_do_work(0);
1114    if (ret < 0)
1115    {
1116        gen_mutex_unlock(&interface_mutex);
1117        return (ret);
1118    }
1119
1120    /* see if this operation has already begun... */
1121    memset(&key, 0, sizeof(struct op_list_search_key));
1122    key.method_addr = src;
1123    key.method_addr_yes = 1;
1124    key.msg_tag = tag;
1125    key.msg_tag_yes = 1;
1126
1127    query_op = op_list_search(op_list_array[IND_NEED_RECV_POST], &key);
1128    if (query_op)
1129    {
1130        gm_addr_data = query_op->addr->method_data;
1131        *id = query_op->op_id;
1132        query_op->context_id = context_id;
1133        query_op->user_ptr = user_ptr;
1134
1135        if(query_op->actual_size > total_expected_size)
1136        {
1137            gossip_lerr("Error: message ordering violation;\n");
1138            gossip_lerr("Error: message too large for next buffer.\n");
1139            gen_mutex_unlock(&interface_mutex);
1140            return(bmi_gm_errno_to_pvfs(-EPROTO));
1141        }
1142
1143        /* we found the operation in progress. */
1144
1145        if (query_op->mode == GM_MODE_REND)
1146        {
1147            /* post has occurred */
1148            op_list_remove(query_op);
1149
1150            gm_op_data = query_op->method_data;
1151            gm_op_data->list_flag = 1;
1152            gm_op_data->buffer_status = GM_BUF_METH_REG;
1153            query_op->buffer_list = buffer_list;
1154            query_op->size_list = size_list;
1155            query_op->list_count = list_count;
1156
1157            /* now we need a token to send a ctrl ack */
1158            if (gm_alloc_send_token(local_port, GM_HIGH_PRIORITY))
1159            {
1160#ifdef ENABLE_GM_BUFPOOL
1161                if (!io_buffers_exhausted())
1162                {
1163#endif /* ENABLE_GM_BUFPOOL */
1164                   
1165                    prepare_for_recv_list(query_op);
1166                    ret = 0;
1167
1168#ifdef ENABLE_GM_BUFPOOL
1169                }
1170                else
1171                {
1172                    gm_free_send_token(local_port, GM_HIGH_PRIORITY);
1173                    op_list_add(op_list_array[IND_NEED_SEND_TOK_HI_CTRLACK],
1174                                query_op);
1175                    ret = 0;
1176                }
1177#endif /* ENABLE_GM_BUFPOOL */
1178            }
1179            else
1180            {
1181                /* we don't have enough tokens */
1182                op_list_add(op_list_array[IND_NEED_SEND_TOK_HI_CTRLACK],
1183                            query_op);
1184                ret = 0;
1185            }
1186        }
1187        else if (query_op->mode == GM_MODE_IMMED)
1188        {
1189            /* all is done except memory copy- complete instantly */
1190            op_list_remove(query_op);
1191            gm_op_data = query_op->method_data;
1192            copy_buffer = query_op->buffer;
1193            total_copied = 0;
1194            for(i=0; i<list_count; i++)
1195            {
1196                if(total_copied == query_op->actual_size)
1197                    break;
1198                copy_size = query_op->actual_size - total_copied;
1199                if(copy_size > size_list[i])
1200                    copy_size = size_list[i];
1201                memcpy(buffer_list[i], copy_buffer, copy_size);
1202                copy_buffer = (void*)((long)copy_buffer + (long)copy_size);
1203                total_copied += copy_size;
1204            }
1205            *total_actual_size = query_op->actual_size;
1206            free(query_op->buffer);
1207            *id = 0;
1208            dealloc_gm_method_op(query_op);
1209            ret = 1;
1210        }
1211        else
1212        {
1213            /* we don't have any other modes implemented yet */
1214            ret = bmi_gm_errno_to_pvfs(-ENOSYS);
1215        }
1216    }
1217    else
1218    {
1219        /* we must create the operation and queue it up */
1220        new_method_op = alloc_gm_method_op();
1221        if (!new_method_op)
1222        {
1223            gen_mutex_unlock(&interface_mutex);
1224            return (bmi_gm_errno_to_pvfs(-ENOMEM));
1225        }
1226        *id = new_method_op->op_id;
1227        new_method_op->user_ptr = user_ptr;
1228        new_method_op->send_recv = BMI_RECV;
1229        new_method_op->addr = src;
1230        new_method_op->buffer = NULL;
1231        new_method_op->expected_size = total_expected_size;
1232        new_method_op->actual_size = 0;
1233        new_method_op->msg_tag = tag;
1234        new_method_op->context_id = context_id;
1235        /* TODO: make sure this is ok */
1236        new_method_op->mode = 0;
1237
1238        gm_op_data = new_method_op->method_data;
1239        gm_op_data->buffer_status = buffer_status;
1240        gm_op_data->list_flag = 1;
1241
1242        new_method_op->buffer_list = buffer_list;
1243        new_method_op->size_list = size_list;
1244        new_method_op->list_count = list_count;
1245        /* just for safety; the user should not use this value in this case */
1246        *total_actual_size = 0;
1247
1248        op_list_add(op_list_array[IND_NEED_CTRL_MATCH], new_method_op);
1249        ret = 0;
1250    }
1251
1252    gen_mutex_unlock(&interface_mutex);
1253    return(ret);
1254}
1255
1256
1257/* BMI_gm_test()
1258 *
1259 * Checks to see if a particular message has completed.
1260 *
1261 * returns 0 on success, -errno on failure
1262 */
1263int BMI_gm_test(bmi_op_id_t id,
1264                int *outcount,
1265                bmi_error_code_t * error_code,
1266                bmi_size_t * actual_size,
1267                void **user_ptr,
1268                int max_idle_time_ms,
1269                bmi_context_id context_id)
1270{
1271    int ret = -1;
1272    method_op_p query_op = (method_op_p)id_gen_fast_lookup(id);
1273    struct gm_op *gm_op_data = query_op->method_data;
1274
1275    *outcount = 0;
1276
1277    gen_mutex_lock(&interface_mutex);
1278
1279    if(gm_op_data->complete)
1280    {
1281        assert(query_op->context_id == context_id);
1282        op_list_remove(query_op);
1283        if(user_ptr != NULL)
1284        {
1285            (*user_ptr) = query_op->user_ptr;
1286        }
1287        (*error_code) = query_op->error_code;
1288        (*actual_size) = query_op->actual_size;
1289        dealloc_gm_method_op(query_op);
1290        (*outcount)++;
1291    }
1292    if(gm_op_data->cancelled && gm_op_data->cancelled_tv_sec)
1293    {
1294        /* report this operation as complete; it is only hanging around
1295         * to protect a recv buffer after cancellation
1296         */
1297        if(user_ptr != NULL)
1298        {
1299            (*user_ptr) = query_op->user_ptr;
1300        }
1301        (*error_code) = -PVFS_ECANCEL;
1302        (*actual_size) = 0;
1303        (*outcount)++;
1304    }
1305    if(*outcount)
1306    {
1307        gen_mutex_unlock(&interface_mutex);
1308        return(0);
1309    }
1310
1311    /* do some ``real work'' here */
1312    ret = gm_do_work(max_idle_time_ms*1000);
1313    if (ret < 0)
1314    {
1315        gen_mutex_unlock(&interface_mutex);
1316        return (ret);
1317    }
1318
1319    if(gm_op_data->complete)
1320    {
1321        assert(query_op->context_id == context_id);
1322        op_list_remove(query_op);
1323        if(user_ptr != NULL)
1324        {
1325            (*user_ptr) = query_op->user_ptr;
1326        }
1327        (*error_code) = query_op->error_code;
1328        (*actual_size) = query_op->actual_size;
1329        dealloc_gm_method_op(query_op);
1330        (*outcount)++;
1331    }
1332
1333    gen_mutex_unlock(&interface_mutex);
1334    return (0);
1335}
1336
1337
1338/* BMI_gm_testsome()
1339 *
1340 * Checks to see if any messages from the specified list have completed.
1341 *
1342 * returns 0 on success, -errno on failure
1343 */
1344int BMI_gm_testsome(int incount,
1345                    bmi_op_id_t * id_array,
1346                    int *outcount,
1347                    int *index_array,
1348                    bmi_error_code_t * error_code_array,
1349                    bmi_size_t * actual_size_array,
1350                    void **user_ptr_array,
1351                    int max_idle_time_ms,
1352                    bmi_context_id context_id)
1353{
1354    int ret = -1;
1355    int i;
1356    method_op_p query_op;
1357    struct gm_op *gm_op_data;
1358
1359    *outcount = 0;
1360
1361    gen_mutex_lock(&interface_mutex);
1362
1363    for(i=0; i<incount; i++)
1364    {
1365        if(id_array[i])
1366        {
1367            query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
1368            gm_op_data = query_op->method_data;
1369            if(gm_op_data->complete)
1370            {
1371                assert(query_op->context_id == context_id);
1372                op_list_remove(query_op);
1373                error_code_array[*outcount] = query_op->error_code;
1374                actual_size_array[*outcount] = query_op->actual_size;
1375                index_array[*outcount] = i;
1376                if (user_ptr_array != NULL)
1377                    user_ptr_array[*outcount] = query_op->user_ptr;
1378                dealloc_gm_method_op(query_op);
1379                (*outcount)++;
1380            }
1381            if(gm_op_data->cancelled && gm_op_data->cancelled_tv_sec)
1382            {
1383                /* report this operation as complete; it is only hanging around
1384                 * to protect a recv buffer after cancellation
1385                 */
1386                error_code_array[*outcount] = -PVFS_ECANCEL;
1387                actual_size_array[*outcount] = 0;
1388                index_array[*outcount] = i;
1389                if(user_ptr_array != NULL)
1390                    user_ptr_array[*outcount] = query_op->user_ptr;
1391                (*outcount)++;
1392            }
1393        }
1394    }
1395    if(*outcount)
1396    {
1397        gen_mutex_unlock(&interface_mutex);
1398        return(0);
1399    }
1400
1401    /* do some ``real work'' here */
1402    ret = gm_do_work(max_idle_time_ms*1000);
1403    if (ret < 0)
1404    {
1405        gen_mutex_unlock(&interface_mutex);
1406        return (ret);
1407    }
1408
1409    for(i=0; i<incount; i++)
1410    {
1411        if(id_array[i])
1412        {
1413            query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
1414            gm_op_data = query_op->method_data;
1415            if(gm_op_data->complete)
1416            {
1417                assert(query_op->context_id == context_id);
1418                op_list_remove(query_op);
1419                error_code_array[*outcount] = query_op->error_code;
1420                actual_size_array[*outcount] = query_op->actual_size;
1421                index_array[*outcount] = i;
1422                if (user_ptr_array != NULL)
1423                    user_ptr_array[*outcount] = query_op->user_ptr;
1424                dealloc_gm_method_op(query_op);
1425                (*outcount)++;
1426            }
1427        }
1428    }
1429
1430    gen_mutex_unlock(&interface_mutex);
1431    return(0);
1432}
1433
1434
1435/* BMI_gm_testcontext()
1436 *
1437 * Checks to see if any operations from the specified context have
1438 * completed
1439 *
1440 * returns 0 on success, -errno on failure
1441 */
1442int BMI_gm_testcontext(int incount,
1443    bmi_op_id_t * out_id_array,
1444    int *outcount,
1445    bmi_error_code_t * error_code_array,
1446    bmi_size_t * actual_size_array,
1447    void **user_ptr_array,
1448    int max_idle_time_ms,
1449    bmi_context_id context_id)
1450{
1451    int ret = -1;
1452    method_op_p query_op = NULL;
1453    op_list_p tmp_entry = NULL;
1454    struct gm_op *gm_op_data = NULL;
1455
1456    *outcount = 0;
1457
1458    gen_mutex_lock(&interface_mutex);
1459
1460    /* check queue before doing anything */
1461    while((*outcount < incount) && (query_op =
1462        op_list_shownext(completion_array[context_id])))
1463    {
1464        assert(query_op->context_id == context_id);
1465        op_list_remove(query_op);
1466        error_code_array[*outcount] = query_op->error_code;
1467        actual_size_array[*outcount] = query_op->actual_size;
1468        out_id_array[*outcount] = query_op->op_id;
1469        if(user_ptr_array != NULL)
1470            user_ptr_array[*outcount] = query_op->user_ptr;
1471        dealloc_gm_method_op(query_op);
1472        (*outcount)++;
1473    }
1474    /* this is kind of nasty- look for cancelled rend recvs that we
1475     * have not reported yet.  Must iterate queue.
1476     */
1477    qlist_for_each(tmp_entry, op_list_array[IND_CANCELLED_REND])
1478    {
1479        if(*outcount >= incount)
1480            break;
1481        query_op = qlist_entry(tmp_entry, struct method_op, op_list_entry);
1482        gm_op_data = query_op->method_data;
1483        if(!gm_op_data->complete)
1484        {
1485            gm_op_data->complete = 1;
1486            error_code_array[*outcount] = -PVFS_ECANCEL;
1487            actual_size_array[*outcount] = 0;
1488            out_id_array[*outcount] = query_op->op_id;
1489            if(user_ptr_array != NULL)
1490                user_ptr_array[*outcount] = query_op->user_ptr;
1491            (*outcount)++;
1492        }
1493    }
1494    if(*outcount)
1495    {
1496        gen_mutex_unlock(&interface_mutex);
1497        return(0);
1498    }
1499
1500    /* do some ``real work'' here */
1501    ret = gm_do_work(max_idle_time_ms*1000);
1502    if (ret < 0)
1503    {
1504        gen_mutex_unlock(&interface_mutex);
1505        return (ret);
1506    }
1507
1508    /* check queue again */
1509    while((*outcount < incount) && (query_op =
1510        op_list_shownext(completion_array[context_id])))
1511    {
1512        assert(query_op->context_id == context_id);
1513        op_list_remove(query_op);
1514        error_code_array[*outcount] = query_op->error_code;
1515        actual_size_array[*outcount] = query_op->actual_size;
1516        out_id_array[*outcount] = query_op->op_id;
1517        if(user_ptr_array != NULL)
1518            user_ptr_array[*outcount] = query_op->user_ptr;
1519        dealloc_gm_method_op(query_op);
1520        (*outcount)++;
1521    }
1522    /* this is kind of nasty- look for cancelled rend recvs that we
1523     * have not reported yet.  Must iterate queue.
1524     */
1525    qlist_for_each(tmp_entry, op_list_array[IND_CANCELLED_REND])
1526    {
1527        if(*outcount >= incount)
1528            break;
1529        query_op = qlist_entry(tmp_entry, struct method_op, op_list_entry);
1530        gm_op_data = query_op->method_data;
1531        if(!gm_op_data->complete)
1532        {
1533            gm_op_data->complete = 1;
1534            error_code_array[*outcount] = -PVFS_ECANCEL;
1535            actual_size_array[*outcount] = 0;
1536            out_id_array[*outcount] = query_op->op_id;
1537            if(user_ptr_array != NULL)
1538                user_ptr_array[*outcount] = query_op->user_ptr;
1539            (*outcount)++;
1540        }
1541    }
1542
1543    gen_mutex_unlock(&interface_mutex);
1544    return(0);
1545}
1546
1547
1548/* BMI_gm_testunexpected()
1549 *
1550 * Checks to see if any unexpected messages have completed.
1551 *
1552 * returns 0 on success, -errno on failure
1553 */
1554int BMI_gm_testunexpected(int incount,
1555                          int *outcount,
1556                          struct bmi_method_unexpected_info *info,
1557                          uint8_t class,
1558                          int max_idle_time_ms)
1559{
1560    int ret = -1;
1561    method_op_p query_op = NULL;
1562    struct op_list_search_key key;
1563
1564    memset(&key, 0, sizeof(struct op_list_search_key));
1565    key.class = class;
1566    key.class_yes = 1;
1567
1568    *outcount = 0;
1569
1570    gen_mutex_lock(&interface_mutex);
1571
1572    if(op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
1573    {
1574        /* nothing ready yet, do some work */
1575        ret = gm_do_work(max_idle_time_ms*1000);
1576        if (ret < 0)
1577        {
1578            gen_mutex_unlock(&interface_mutex);
1579            return (ret);
1580        }
1581    }
1582
1583    while ((*outcount < incount) &&
1584           (query_op =
1585            op_list_search(op_list_array[IND_COMPLETE_RECV_UNEXP], &key)))
1586    {
1587        info[*outcount].error_code = query_op->error_code;
1588        info[*outcount].addr = query_op->addr;
1589        info[*outcount].buffer = query_op->buffer;
1590        info[*outcount].size = query_op->actual_size;
1591        info[*outcount].tag = query_op->msg_tag;
1592        op_list_remove(query_op);
1593        dealloc_gm_method_op(query_op);
1594        (*outcount)++;
1595    }
1596
1597    gen_mutex_unlock(&interface_mutex);
1598    return (0);
1599}
1600
1601
1602/* BMI_gm_open_context()
1603 *
1604 * opens a new context with the specified context id
1605 *
1606 * returns 0 on success, -errno on failure
1607 */
1608int BMI_gm_open_context(bmi_context_id context_id)
1609{
1610    gen_mutex_lock(&interface_mutex);
1611    /* start a new queue for tracking completions in this context */
1612    completion_array[context_id] = op_list_new();
1613    if (!completion_array[context_id])
1614    {
1615        gen_mutex_unlock(&interface_mutex);
1616        return(bmi_gm_errno_to_pvfs(-ENOMEM));
1617    }
1618
1619    gen_mutex_unlock(&interface_mutex);
1620    return(0);
1621}
1622
1623/* BMI_gm_close_context()
1624 *
1625 * closes a context previously created with BMI_gm_open_context()
1626 *
1627 * no return value
1628 */
1629void BMI_gm_close_context(bmi_context_id context_id)
1630{
1631    gen_mutex_lock(&interface_mutex);
1632    /* tear down completion queue for this context */
1633    op_list_cleanup(completion_array[context_id]);
1634
1635    gen_mutex_unlock(&interface_mutex);
1636    return;
1637}
1638
1639/******************************************************************
1640 * Internal support functions
1641 */
1642
1643/*
1644 * dealloc_gm_method_addr()
1645 *
1646 * destroys method address structures generated by the gm module.
1647 *
1648 * no return value
1649 */
1650static void dealloc_gm_method_addr(bmi_method_addr_p map)
1651{
1652
1653    bmi_dealloc_method_addr(map);
1654
1655    return;
1656}
1657
1658
1659/*
1660 * alloc_gm_method_addr()
1661 *
1662 * creates a new method address with defaults filled in for GM.
1663 *
1664 * returns pointer to struct on success, NULL on failure
1665 */
1666static bmi_method_addr_p alloc_gm_method_addr(void)
1667{
1668
1669    struct bmi_method_addr *my_method_addr = NULL;
1670    struct gm_addr *gm_data = NULL;
1671
1672    my_method_addr = bmi_alloc_method_addr(gm_method_params.method_id,
1673            sizeof(struct gm_addr));
1674    if (!my_method_addr)
1675    {
1676        return (NULL);
1677    }
1678
1679    /* note that we trust the alloc_method_addr() function to have zeroed
1680     * out the structures for us already
1681     */
1682
1683    gm_data = my_method_addr->method_data;
1684    gm_data->node_id = GM_NO_SUCH_NODE_ID;
1685    return (my_method_addr);
1686}
1687
1688
1689/*
1690 * alloc_gm_method_op()
1691 *
1692 * creates a new method op with defaults filled in for GM.
1693 *
1694 * returns pointer to struct on success, NULL on failure
1695 */
1696static method_op_p alloc_gm_method_op(void)
1697{
1698    method_op_p my_method_op = NULL;
1699
1700    my_method_op = bmi_alloc_method_op(sizeof(struct gm_op));
1701
1702    /* note that we trust the alloc_method_addr() function to have zeroed
1703     * out the structures for us already
1704     */
1705
1706    return (my_method_op);
1707}
1708
1709
1710/*
1711 * dealloc_gm_method_op()
1712 *
1713 * frees the memory allocated to a gm method_op structure
1714 *
1715 * no return value
1716 */
1717void dealloc_gm_method_op(method_op_p op_p)
1718{
1719    bmi_dealloc_method_op(op_p);
1720    return;
1721}
1722
1723
1724/* gm_post_send_build_op_list()
1725 *
1726 * builds a method op structure for the specified send operation
1727 *
1728 * returns 0 on success, -errno on failure
1729 */
1730static int gm_post_send_build_op_list(bmi_op_id_t * id,
1731    bmi_method_addr_p dest,
1732    const void *const *buffer_list,
1733    const bmi_size_t *size_list,
1734    int list_count,
1735    bmi_size_t total_size,
1736    bmi_msg_tag_t tag,
1737    int mode,
1738    int buffer_status,
1739    void *user_ptr, bmi_context_id context_id)
1740{
1741    method_op_p new_method_op = NULL;
1742    struct gm_op *gm_op_data = NULL;
1743
1744    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "gm_post_send_build_op_list() called.\n");
1745
1746    /* we need an op structure to keep up with this send */
1747    new_method_op = alloc_gm_method_op();
1748    if (!new_method_op)
1749    {
1750        return (bmi_gm_errno_to_pvfs(-ENOMEM));
1751    }
1752    *id = new_method_op->op_id;
1753    new_method_op->user_ptr = user_ptr;
1754    new_method_op->send_recv = BMI_SEND;
1755    new_method_op->addr = dest;
1756    new_method_op->buffer = NULL;
1757    new_method_op->actual_size = total_size;
1758    /* TODO: is this right thing to do for send side? */
1759    new_method_op->expected_size = 0; 
1760    new_method_op->msg_tag = tag;
1761    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Tag: %d.\n", (int) tag);
1762    new_method_op->mode = mode;
1763    new_method_op->context_id = context_id;
1764
1765    new_method_op->buffer_list = (void **) buffer_list;
1766    new_method_op->size_list = size_list;
1767    new_method_op->list_count = list_count;
1768
1769    gm_op_data = new_method_op->method_data;
1770    gm_op_data->buffer_status = buffer_status;
1771    gm_op_data->list_flag = 1;
1772
1773    return (gm_post_send_check_resource(new_method_op));
1774}
1775
1776
1777/* gm_post_send_build_op()
1778 *
1779 * builds a method op structure for the specified send operation
1780 *
1781 * returns 0 on success, -errno on failure
1782 */
1783static int gm_post_send_build_op(bmi_op_id_t * id,
1784    bmi_method_addr_p dest,
1785    const void *buffer,
1786    bmi_size_t size,
1787    bmi_msg_tag_t tag,
1788    int mode,
1789    int buffer_status,
1790    void *user_ptr, bmi_context_id context_id)
1791{
1792    method_op_p new_method_op = NULL;
1793    struct gm_op *gm_op_data = NULL;
1794
1795    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "gm_post_send_build_op() called.\n");
1796
1797    /* we need an op structure to keep up with this send */
1798    new_method_op = alloc_gm_method_op();
1799    if (!new_method_op)
1800    {
1801        return (bmi_gm_errno_to_pvfs(-ENOMEM));
1802    }
1803    *id = new_method_op->op_id;
1804    new_method_op->user_ptr = user_ptr;
1805    new_method_op->send_recv = BMI_SEND;
1806    new_method_op->addr = dest;
1807    new_method_op->buffer = (void *) buffer;
1808    new_method_op->actual_size = size;
1809    /* TODO: is this right thing to do for send side? */
1810    new_method_op->expected_size = 0; 
1811    new_method_op->msg_tag = tag;
1812    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Tag: %d.\n", (int) tag);
1813    new_method_op->mode = mode;
1814    new_method_op->context_id = context_id;
1815
1816    gm_op_data = new_method_op->method_data;
1817    gm_op_data->buffer_status = buffer_status;
1818
1819    return (gm_post_send_check_resource(new_method_op));
1820}
1821
1822
1823/* gm_post_send_check_resource()
1824 *
1825 * Checks to see if communication can proceed for a given send operation
1826 *
1827 * returns 0 on success, -errno on failure
1828 */
1829static int gm_post_send_check_resource(struct method_op* mop)
1830{
1831    int ret = -1;
1832
1833    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "gm_post_send_check_resource() called.\n");
1834
1835    /* what do we want to do here?
1836     * For now, try to send a control message and then bail out.  The
1837     * poll function will drive the rest of the way.
1838     */
1839
1840    /* make sure that we are not bypassing any operation that has stalled
1841     * waiting on tokens
1842     */
1843    if (!op_list_empty(op_list_array[IND_NEED_SEND_TOK_HI_CTRL]))
1844    {
1845        op_list_add(op_list_array[IND_NEED_SEND_TOK_HI_CTRL], mop);
1846        /* push on existing work rather than attempting this send */
1847        return (gm_do_work(0));
1848    }
1849
1850    /* all clear; let's see if we have a send token for the control message */
1851    if (!gm_alloc_send_token(local_port, GM_HIGH_PRIORITY))
1852    {
1853        /* queue up and wait for a token */
1854        op_list_add(op_list_array[IND_NEED_SEND_TOK_HI_CTRL], mop);
1855        /* push on existing work rather than attempting this send */
1856        ret = gm_do_work(0);
1857    }
1858    else
1859    {
1860        gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Proceeding with communication.\n");
1861        if (mop->mode == GM_MODE_REND)
1862        {
1863            initiate_send_rend(mop);
1864            ret = 0;
1865        }
1866        else
1867        {
1868            initiate_send_immed(mop);
1869            ret = 0;
1870        }
1871    }
1872
1873    return (ret);
1874}
1875
1876
1877/* initiate_send_immed()
1878 *
1879 * starts off an immediate network transfer (data + ctrl message in one
1880 * buffer).  Assumes that we have already acquired a high priority send
1881 * token.
1882 *
1883 * no return value
1884 */
1885static void initiate_send_immed(method_op_p mop)
1886{
1887
1888    struct gm_addr *gm_addr_data = mop->addr->method_data;
1889    bmi_size_t true_msg_len = 0;
1890
1891    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Sending immediate msg.\n");
1892
1893    true_msg_len = mop->actual_size + sizeof(struct ctrl_msg);
1894
1895    /* send ctrl message */
1896    gm_send_with_callback(local_port, mop->buffer,
1897                                  GM_IMMED_SIZE, true_msg_len, GM_HIGH_PRIORITY,
1898                                  gm_addr_data->node_id, gm_addr_data->port_id, immed_send_callback,
1899                                  mop);
1900
1901    /* queue up to wait for completion */
1902    op_list_add(op_list_array[IND_SENDING], mop);
1903
1904    return;
1905}
1906
1907
1908/* initiate_put_announcement()
1909 *
1910 * sends a control message to inform a target host that a directed send
1911 * has completed
1912 *
1913 * no return value.
1914 */
1915static void initiate_put_announcement(method_op_p mop)
1916{
1917    struct gm_op *gm_op_data = mop->method_data;
1918    struct gm_addr *gm_addr_data = mop->addr->method_data;
1919    struct ctrl_msg *my_ctrl = NULL;
1920
1921    /* woohoo- we have a token, work on a ctrl message */
1922    my_ctrl = bmi_gm_bufferpool_get(ctrl_send_pool);
1923
1924    my_ctrl->ctrl_type = CTRL_PUT_TYPE;
1925    my_ctrl->magic_nr = BMI_MAGIC_NR;
1926    my_ctrl->u.put.receiver_op_id = gm_op_data->peer_op_id;
1927    /* keep up with this buffer in the op structure */
1928    gm_op_data->freeable_ctrl_buffer = my_ctrl;
1929
1930    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Sending ctrl msg.\n");
1931
1932    /* send ctrl message */
1933    gm_send_with_callback(local_port, my_ctrl,
1934                                  GM_IMMED_SIZE, GM_CTRL_LENGTH,
1935                                  GM_HIGH_PRIORITY, gm_addr_data->node_id,
1936                                  gm_addr_data->port_id,
1937                                  ctrl_put_callback, mop);
1938
1939    /* queue up to wait for completion */
1940    op_list_add(op_list_array[IND_SENDING], mop);
1941
1942    return;
1943}
1944
1945/* initiate_send_rend()
1946 *
1947 * actually starts off a rendezvous send operation by sending the control
1948 * message.  Assumes that we have already acquired a high priority send
1949 * token.
1950 *
1951 * no return value
1952 */
1953static void initiate_send_rend(method_op_p mop)
1954{
1955    struct gm_op *gm_op_data = mop->method_data;
1956    struct gm_addr *gm_addr_data = mop->addr->method_data;
1957    struct ctrl_msg *my_ctrl = NULL;
1958
1959    /* woohoo- we have a token, work on a ctrl message */
1960    my_ctrl = bmi_gm_bufferpool_get(ctrl_send_pool);
1961
1962    my_ctrl->ctrl_type = CTRL_REQ_TYPE;
1963    my_ctrl->magic_nr = BMI_MAGIC_NR;
1964    my_ctrl->u.req.actual_size = mop->actual_size;
1965    my_ctrl->u.req.msg_tag = mop->msg_tag;
1966    my_ctrl->u.req.sender_op_id = mop->op_id;
1967    /* keep up with this buffer in the op structure */
1968    gm_op_data->freeable_ctrl_buffer = my_ctrl;
1969
1970    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Sending ctrl msg.\n");
1971
1972    /* send ctrl message */
1973    gm_send_with_callback(local_port, my_ctrl,
1974                                  GM_IMMED_SIZE, GM_CTRL_LENGTH,
1975                                  GM_HIGH_PRIORITY, gm_addr_data->node_id,
1976                                  gm_addr_data->port_id,
1977                                  ctrl_req_callback, mop);
1978
1979    /* queue up to wait for completion */
1980    op_list_add(op_list_array[IND_SENDING], mop);
1981
1982    return;
1983}
1984
1985/* ctrl_put_callback()
1986 *
1987 * callback function triggered on completion of a directed send
1988 * announcement 
1989 *
1990 * no return value
1991 */
1992static void ctrl_put_callback(struct gm_port *port,
1993                              void *context,
1994                              gm_status_t status)
1995{
1996    /* the context is our operation */
1997    method_op_p my_op = context;
1998    struct gm_op *gm_op_data = my_op->method_data;
1999    struct gm_addr *gm_addr_data = my_op->addr->method_data;
2000
2001    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "ctrl_put_callback() called.\n");
2002
2003    /* free up ctrl message buffer */
2004    bmi_gm_bufferpool_put(ctrl_send_pool, gm_op_data->freeable_ctrl_buffer);
2005    gm_op_data->freeable_ctrl_buffer = NULL;
2006
2007    /* give back a send token */
2008    gm_free_send_token(local_port, GM_HIGH_PRIORITY);
2009
2010    /* see if the receiver couldn't keep up */
2011    if (status == GM_SEND_TIMED_OUT)
2012    {
2013        gossip_lerr("Error: GM TIMEOUT!  Not handled...\n");
2014        op_list_remove(my_op);
2015        my_op->error_code = bmi_gm_errno_to_pvfs(-ETIMEDOUT);
2016        op_list_add(completion_array[my_op->context_id], my_op);
2017        gm_op_data->complete = 1;
2018        return;
2019    }
2020
2021    /* look for other errors */
2022    if (status != GM_SUCCESS)
2023    {
2024       
2025        gossip_lerr("Error: GM send failure, detected in callback.\n");
2026        gossip_err("Error: %s (%d)\n", gm_strerror(status), (int)status);
2027        gossip_err("Sending from %s to %s\n", gm_host_name,
2028            gm_node_id_to_host_name(local_port, gm_addr_data->node_id));
2029        op_list_remove(my_op);
2030        /* TODO: need generic solution to map GM codes to pvfs2 codes */
2031        if(status == GM_SEND_TARGET_NODE_UNREACHABLE)
2032            my_op->error_code = -PVFS_EHOSTUNREACH;
2033        else
2034            my_op->error_code = -PVFS_EPROTO;
2035        op_list_add(completion_array[my_op->context_id], my_op);
2036        gm_op_data->complete = 1;
2037        return;
2038    }
2039
2040    /* operation is now complete */
2041    op_list_remove(my_op);
2042    my_op->error_code = 0;
2043    op_list_add(completion_array[my_op->context_id], my_op);
2044    gm_op_data->complete = 1;
2045
2046    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Finished ctrl_put_callback().\n");
2047
2048    return;
2049}
2050
2051
2052/* immed_send_callback()
2053 *
2054 * callback function triggered on completion of an immediate send
2055 * operation.
2056 *
2057 * no return value
2058 */
2059static void immed_send_callback(struct gm_port *port,
2060                                void *context,
2061                                gm_status_t status)
2062{
2063    /* the context is our operation */
2064    method_op_p my_op = context;
2065    struct gm_op *gm_op_data = my_op->method_data;
2066    struct gm_addr *gm_addr_data = my_op->addr->method_data;
2067
2068    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "immed_send_callback() called.\n");
2069
2070    /* give back a send token */
2071    gm_free_send_token(local_port, GM_HIGH_PRIORITY);
2072
2073    if (gm_op_data->buffer_status == GM_BUF_METH_ALLOC)
2074    {
2075        /* this was an internally allocated buffer */
2076        gm_dma_free(local_port, my_op->buffer);
2077    }
2078
2079    /* see if the receiver couldn't keep up */
2080    if (status == GM_SEND_TIMED_OUT)
2081    {
2082        gossip_lerr("Error: GM TIMEOUT!  Not handled...\n");
2083        op_list_remove(my_op);
2084        my_op->error_code = bmi_gm_errno_to_pvfs(-ETIMEDOUT);
2085        op_list_add(completion_array[my_op->context_id], my_op);
2086        gm_op_data->complete = 1;
2087        return;
2088    }
2089
2090    /* look for other errors */
2091    if (status != GM_SUCCESS)
2092    {
2093        gossip_lerr("Error: GM send failure, detected in callback.\n");
2094        gossip_err("Error: %s (%d)\n", gm_strerror(status), (int)status);
2095        gossip_err("Sending from %s to %s\n", gm_host_name,
2096            gm_node_id_to_host_name(local_port, gm_addr_data->node_id));
2097        op_list_remove(my_op);
2098        /* TODO: need generic solution to map GM codes to pvfs2 codes */
2099        if(status == GM_SEND_TARGET_NODE_UNREACHABLE)
2100            my_op->error_code = -PVFS_EHOSTUNREACH;
2101        else
2102            my_op->error_code = -PVFS_EPROTO;
2103        op_list_add(completion_array[my_op->context_id], my_op);
2104        gm_op_data->complete = 1;
2105        return;
2106    }
2107
2108    /* golden! */
2109
2110    /* we need to drive the state of this operation to the next phase, in
2111     * which we wait for a ctrl msg from the reciever before proceeding
2112     */
2113    op_list_remove(my_op);
2114    my_op->error_code = 0;
2115    /* let it hang around in the receiving queue until we get a control
2116     * message back from the target that indicates that we can continue.
2117     */
2118    op_list_add(completion_array[my_op->context_id], my_op);
2119    gm_op_data->complete = 1;
2120
2121    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Finished immed_send_callback().\n");
2122
2123    return;
2124}
2125
2126
2127/* ctrl_req_callback()
2128 *
2129 * callback function triggered on the completion of a request control
2130 * message send.
2131 *
2132 * no return value
2133 */
2134static void ctrl_req_callback(struct gm_port *port,
2135                              void *context,
2136                              gm_status_t status)
2137{
2138    /* the context is our operation */
2139    method_op_p my_op = context;
2140    struct gm_op *gm_op_data = NULL;
2141    struct gm_addr *gm_addr_data = my_op->addr->method_data;
2142
2143    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "ctrl_req_callback() called.\n");
2144
2145    /* give back a send token */
2146    gm_free_send_token(local_port, GM_HIGH_PRIORITY);
2147
2148    /* free up ctrl message buffer */
2149    gm_op_data = my_op->method_data;
2150    bmi_gm_bufferpool_put(ctrl_send_pool, gm_op_data->freeable_ctrl_buffer);
2151    gm_op_data->freeable_ctrl_buffer = NULL;
2152
2153    /* see if the receiver couldn't keep up */
2154    if (status == GM_SEND_TIMED_OUT)
2155    {
2156        gossip_lerr("Error: GM TIMEOUT!  Not handled...\n");
2157        op_list_remove(my_op);
2158        my_op->error_code = bmi_gm_errno_to_pvfs(-ETIMEDOUT);
2159        op_list_add(completion_array[my_op->context_id], my_op);
2160        gm_op_data->complete = 1;
2161        return;
2162    }
2163
2164    /* look for other errors */
2165    if (status != GM_SUCCESS)
2166    {
2167        gossip_lerr("Error: GM send failure, detected in callback.\n");
2168        gossip_err("Error: %s (%d)\n", gm_strerror(status), (int)status);
2169        gossip_err("Sending from %s to %s\n", gm_host_name,
2170            gm_node_id_to_host_name(local_port, gm_addr_data->node_id));
2171        op_list_remove(my_op);
2172        /* TODO: need generic solution to map GM codes to pvfs2 codes */
2173        if(status == GM_SEND_TARGET_NODE_UNREACHABLE)
2174            my_op->error_code = -PVFS_EHOSTUNREACH;
2175        else
2176            my_op->error_code = -PVFS_EPROTO;
2177        op_list_add(completion_array[my_op->context_id], my_op);
2178        gm_op_data->complete = 1;
2179        return;
2180    }
2181
2182    if(gm_op_data->cancelled)
2183    {
2184        /* this operation has been cancelled; don't do any further work */
2185        op_list_remove(my_op);
2186        my_op->error_code = -PVFS_ECANCEL;
2187        op_list_add(completion_array[my_op->context_id], my_op);
2188        gm_op_data->complete = 1;
2189        return;
2190    }
2191
2192    /* golden! */
2193
2194    /* don't touch the operation.  The ack receive handler will drive it
2195     * to the next state.
2196     */
2197
2198    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Finished ctrl_req_callback.\n");
2199
2200    /* etc. etc. */
2201    return;
2202}
2203
2204/* ctrl_recv_pool_init
2205 *
2206 * sets up a pool of receive buffers for the purpose of handling control
2207 * messages.  These buffers will be automatically re-posted as control
2208 * messages are handled.
2209 * NOTE: the caller is trusted to not exceed our token limit ...
2210 *
2211 * returns 0 on success, -errno on failure
2212 */
2213static int ctrl_recv_pool_init(int pool_size)
2214{
2215
2216    struct ctrl_msg *tmp_ctrl = NULL;
2217    int i = 0;
2218
2219    for (i = 0; i < pool_size; i++)
2220    {
2221        tmp_ctrl = gm_dma_malloc(local_port,
2222                                 gm_max_length_for_size(GM_IMMED_SIZE));
2223        if (!tmp_ctrl)
2224        {
2225            return (bmi_gm_errno_to_pvfs(-ENOMEM));
2226        }
2227        gm_provide_receive_buffer(local_port, tmp_ctrl,
2228                                  GM_IMMED_SIZE, GM_HIGH_PRIORITY);
2229    }
2230
2231    recv_token_count_high -= pool_size;
2232
2233    return (0);
2234}
2235
2236
2237/* gm_do_work()
2238 *
2239 * this is a generic function that forces work to be done within the gm
2240 * method when test functions are called.  It does not target any
2241 * particular operation.
2242 *
2243 * returns 0 on success, -errno on failure
2244 */
2245static int gm_do_work(int wait_time)
2246{
2247    int events = 0;
2248    int ret = -1;
2249
2250    /* we have a lot of responsibilities here
2251     * 1) peek to see if there are any events pending
2252     * 2) if not, see how many tokens we have and sweep op lists to do
2253     * work on ops waiting on tokens
2254     * 3) if there were events pending, do recv cycle
2255     * 4) if there were no events, set alarm and do cycle
2256     * 5) see how many tokens we have and sweep op lists again.
2257     */
2258
2259    /* see if there is any work ready */
2260    events = gm_receive_pending(local_port);
2261    if (events == 0)
2262    {
2263        /* since there isn't any GM work to do right this second, let's
2264         * try to service operations waiting on tokens first.
2265         */
2266        delayed_token_sweep();
2267
2268        /* See if there is anything to do now; specify timeout so we
2269         * don't busy spin if idle
2270         */
2271        if (gm_receive_pending(local_port) || wait_time > 0)
2272        {
2273            ret = receive_cycle(wait_time);
2274            if (ret < 0)
2275            {
2276                return (ret);
2277            }
2278        }
2279    }
2280    else
2281    {
2282        /* we know there is work to do- call receive with no timeout */
2283        ret = receive_cycle(0);
2284        if (ret < 0)
2285        {
2286            return (ret);
2287        }
2288        /* maybe we have tokens available now */
2289        delayed_token_sweep();
2290    }
2291
2292    return (0);
2293}
2294
2295
2296/* delayed_token_sweep()
2297 *
2298 * Attempts to service any operations that are waiting on tokens to
2299 * become available
2300 *
2301 * returns 0 on success, -errno on failure
2302 */
2303static void delayed_token_sweep(void)
2304{
2305    method_op_p query_op = NULL;
2306    struct gm_op* gm_op_data = NULL;
2307
2308    /* NOTE: the order is important here.  We try to push the
2309     * last steps of stalled messages first in order to preserve
2310     * ordering.
2311     */
2312
2313    /* waiting for one last high priority send token to complete a
2314     * rendezvous communication
2315     */
2316    if (!op_list_empty(op_list_array[IND_NEED_SEND_TOK_HI_PUT]))
2317    {
2318        /* see if we have any high priority send tokens */
2319        if (gm_alloc_send_token(local_port, GM_HIGH_PRIORITY))
2320        {
2321            query_op =
2322                op_list_shownext(op_list_array[IND_NEED_SEND_TOK_HI_PUT]);
2323            op_list_remove(query_op);
2324            initiate_put_announcement(query_op);
2325            return;
2326        }
2327    }
2328
2329    /* look for stuff waiting on low send tokens */
2330    if (!op_list_empty(op_list_array[IND_NEED_SEND_TOK_LOW]))
2331    {
2332        /* we need a low priority send token */
2333        if (gm_alloc_send_token(local_port, GM_LOW_PRIORITY))
2334        {
2335#ifdef ENABLE_GM_BUFPOOL
2336            if (!io_buffers_exhausted())
2337            {
2338#endif /* ENABLE_GM_BUFPOOL */
2339                query_op =
2340                    op_list_shownext(op_list_array[IND_NEED_SEND_TOK_LOW]);
2341               
2342                op_list_remove(query_op);
2343                gm_op_data = query_op->method_data;
2344                if(gm_op_data->list_flag)
2345                    setup_send_data_buffer_list(query_op);
2346                else
2347                    setup_send_data_buffer(query_op);
2348                send_data_buffer(query_op);
2349                return;
2350#ifdef ENABLE_GM_BUFPOOL
2351            }
2352            else
2353            {
2354                gm_free_send_token(local_port, GM_LOW_PRIORITY);
2355            }
2356#endif /* ENABLE_GM_BUFPOOL */
2357        }
2358    }
2359
2360    /* look for stuff waiting for tokens to acknowledge send request */
2361    if (!op_list_empty(op_list_array[IND_NEED_SEND_TOK_HI_CTRLACK]))
2362    {
2363        if (gm_alloc_send_token(local_port, GM_HIGH_PRIORITY))
2364        {
2365#ifdef ENABLE_GM_BUFPOOL
2366            if (!io_buffers_exhausted())
2367            {
2368#endif /* ENABLE_GM_BUFPOOL */
2369                query_op =
2370                    op_list_shownext(op_list_array
2371                                     [IND_NEED_SEND_TOK_HI_CTRLACK]);
2372                op_list_remove(query_op);
2373                prepare_for_recv(query_op);
2374                return;
2375#ifdef ENABLE_GM_BUFPOOL
2376            }
2377            else
2378            {
2379                gm_free_send_token(local_port, GM_HIGH_PRIORITY);
2380            }
2381#endif /* ENABLE_GM_BUFPOOL */
2382        }
2383    }
2384
2385    /* look for stuff that is waiting for a token to send a ctrl requeset */
2386    if (!op_list_empty(op_list_array[IND_NEED_SEND_TOK_HI_CTRL]))
2387    {
2388        /* see if we have any high priority send tokens */
2389        if (gm_alloc_send_token(local_port, GM_HIGH_PRIORITY))
2390        {
2391            query_op =
2392                op_list_shownext(op_list_array[IND_NEED_SEND_TOK_HI_CTRL]);
2393            op_list_remove(query_op);
2394            if (query_op->mode == GM_MODE_REND)
2395            {
2396                initiate_send_rend(query_op);
2397                return;
2398            }
2399            else
2400            {
2401                initiate_send_immed(query_op);
2402                return;
2403            }
2404        }
2405    }
2406
2407    return;
2408}
2409
2410
2411/* receive_cycle()
2412 *
2413 * attempts to do a GM receive.  The argument specifies how long of a
2414 * timeout to use.
2415 *
2416 * returns 0 on success, -errno on failure
2417 */
2418static int receive_cycle(int timeout)
2419{
2420    gm_alarm_t poll_timeout_alarm;
2421    int ret = -1;
2422    gm_recv_event_t *poll_event = NULL;
2423    int handled_events = 0;
2424
2425    if (timeout > 0)
2426    {
2427        gm_initialize_alarm(&poll_timeout_alarm);
2428        gm_set_alarm(local_port, &poll_timeout_alarm, timeout,
2429                     alarm_callback, NULL);
2430    }
2431    global_timeout_flag = 0;
2432    do
2433    {
2434        if(timeout > 0)
2435        {
2436            poll_event = gm_blocking_receive_no_spin(local_port);
2437        }
2438        else
2439        {
2440            poll_event = gm_receive(local_port);
2441        }
2442
2443        switch (gm_ntohc(poll_event->recv.type))
2444        {
2445            case GM_FAST_HIGH_PEER_RECV_EVENT:
2446            case GM_FAST_HIGH_RECV_EVENT:
2447                handled_events++;
2448                ret = recv_event_handler(poll_event, 1);
2449                break;
2450            case GM_HIGH_PEER_RECV_EVENT:
2451            case GM_HIGH_RECV_EVENT:
2452                handled_events++;
2453                ret = recv_event_handler(poll_event, 0);
2454                break;
2455            case _GM_SLEEP_EVENT:
2456                handled_events++;
2457                gen_mutex_unlock(&interface_mutex);
2458                gm_unknown(local_port, poll_event);
2459                gen_mutex_lock(&interface_mutex);
2460                ret = 0;
2461                break;
2462            default:
2463                handled_events++;
2464                gm_unknown(local_port, poll_event);
2465                ret = 0;
2466                break;
2467        }
2468    } while (((timeout > 0 && !global_timeout_flag && ret == 0) ||
2469              (timeout == 0 && gm_receive_pending(local_port) && ret == 0)) &&
2470             handled_events < GM_MAX_EVENT_CYCLES);
2471
2472    if (timeout > 0 && !global_timeout_flag)
2473    {
2474        gm_cancel_alarm(&poll_timeout_alarm);
2475    }
2476
2477    return (ret);
2478}
2479
2480
2481/* alarm_callback()
2482 *
2483 * function callback that occurs if our receive cycle times out.
2484 * Doesn't actually do anything but keep us from blocking.
2485 *
2486 * no return value
2487 */
2488void alarm_callback(void *context)
2489{
2490    global_timeout_flag = 1;
2491    gossip_debug(GOSSIP_BMI_DEBUG_GM, "Timer expired.\n");
2492    return;
2493}
2494
2495
2496/* immed_unexp_recv_handler()
2497 *
2498 * handles immediate receive events for which the user has not yet
2499 * provided a buffer.
2500 *
2501 * returns 0 on success, -errno on failure
2502 */
2503static int immed_unexp_recv_handler(bmi_size_t size,
2504                                    bmi_msg_tag_t msg_tag,
2505                                    bmi_method_addr_p map,
2506                                    void *buffer,
2507                                    uint8_t class)
2508{
2509    method_op_p new_method_op = NULL;
2510    struct gm_op *gm_op_data = NULL;
2511
2512    /* we need an op structure to keep up with this */
2513    new_method_op = alloc_gm_method_op();
2514    if (!new_method_op)
2515    {
2516        return (bmi_gm_errno_to_pvfs(-ENOMEM));
2517    }
2518    new_method_op->send_recv = BMI_RECV;
2519    new_method_op->addr = map;
2520    new_method_op->actual_size = size;
2521    new_method_op->expected_size = 0;
2522    new_method_op->msg_tag = msg_tag;
2523    new_method_op->error_code = 0;
2524    new_method_op->mode = GM_MODE_UNEXP;
2525    new_method_op->buffer = buffer;
2526    new_method_op->class = class;
2527    gm_op_data = new_method_op->method_data;
2528
2529    op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP], new_method_op);
2530    gm_op_data->complete = 1;
2531
2532    return (0);
2533}
2534
2535/* immed_recv_handler()
2536 *
2537 * handles immediate receive events for which the user has not yet
2538 * provided a buffer.
2539 *
2540 * returns 0 on success, -errno on failure
2541 */
2542static int immed_recv_handler(bmi_size_t actual_size,
2543                              bmi_msg_tag_t msg_tag,
2544                              bmi_method_addr_p map,
2545                              void *buffer)
2546{
2547    method_op_p new_method_op = NULL;
2548
2549    /* we need an op structure to keep up with this */
2550    new_method_op = alloc_gm_method_op();
2551    if (!new_method_op)
2552    {
2553        return (bmi_gm_errno_to_pvfs(-ENOMEM));
2554    }
2555    new_method_op->send_recv = BMI_RECV;
2556    new_method_op->addr = map;
2557    new_method_op->actual_size = actual_size;
2558    /* TODO: is this the right thing to do here? */
2559    new_method_op->expected_size = 0;
2560    new_method_op->msg_tag = msg_tag;
2561    new_method_op->mode = GM_MODE_IMMED;
2562    new_method_op->buffer = buffer;
2563    new_method_op->context_id = -1;
2564
2565    /* queue up until user posts matching receive */
2566    op_list_add(op_list_array[IND_NEED_RECV_POST], new_method_op);
2567
2568    return (0);
2569}
2570
2571
2572/* recv_event_handler()
2573 *
2574 * handles low priority receive events as detected by gm_do_work()
2575 *
2576 * returns 0 on success, -errno on failure
2577 */
2578static int recv_event_handler(gm_recv_event_t * poll_event,
2579                             int fast)
2580{
2581    struct ctrl_msg ctrl_copy;
2582    int ret = bmi_gm_errno_to_pvfs(-ENOSYS);
2583    bmi_method_addr_p map = NULL;
2584    struct op_list_search_key key;
2585    void *tmp_buffer = NULL;
2586    method_op_p query_op = NULL;
2587    struct gm_addr *gm_addr_data = NULL;
2588    struct gm_op *gm_op_data = NULL;
2589    void* copy_buffer;
2590    bmi_size_t copy_size, total_copied;
2591    int i;
2592
2593    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "recv_event_handler() called.\n");
2594    /* what are the possibilities here?
2595     * 1) recv ctrl_ack for a send that we initiated
2596     * 2) recv ctrl_req from someone who wishes to send to us
2597     *    a) unexpected
2598     *    b) immediate
2599     *    c) rendezvous
2600     */
2601
2602    /* NOTE: we *must* return ctrl buffers as quickly as possible.  They
2603     * must be available for accepting new messages at any time and we
2604     * cannot let too many remain out of service.
2605     */
2606
2607    /* grab a copy of the control message out of the event */
2608    if (fast)
2609    {
2610        ctrl_copy = *(struct ctrl_msg *) (gm_ntohp(poll_event->recv.message) +
2611                                       gm_ntohl(poll_event->recv.length) -
2612                                       sizeof(struct ctrl_msg));
2613    }
2614    else
2615    {
2616        ctrl_copy = *(struct ctrl_msg *) (gm_ntohp(poll_event->recv.buffer) +
2617                                       gm_ntohl(poll_event->recv.length) -
2618                                       sizeof(struct ctrl_msg));
2619    }
2620
2621    /* check magic */
2622    if(ctrl_copy.magic_nr != BMI_MAGIC_NR)
2623    {
2624        gossip_err("Error: bad magic in bmi_gm message.\n");
2625        gm_provide_receive_buffer(local_port,
2626                                  gm_ntohp(poll_event->recv.buffer),
2627                                  GM_IMMED_SIZE, GM_HIGH_PRIORITY);
2628        return(0);
2629    }
2630
2631    /* repost buffer ASAP unless we need to copy data out of it */
2632    if(ctrl_copy.ctrl_type != CTRL_IMMED_TYPE &&
2633        ctrl_copy.ctrl_type != CTRL_UNEXP_TYPE)
2634    {
2635        gm_provide_receive_buffer(local_port,
2636                                  gm_ntohp(poll_event->recv.buffer),
2637                                  GM_IMMED_SIZE, GM_HIGH_PRIORITY);
2638    }
2639
2640    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Ctrl_type: %d.\n", ctrl_copy.ctrl_type);
2641    switch(ctrl_copy.ctrl_type)
2642    {
2643        case CTRL_ACK_TYPE:
2644            /* this is a response to one of our control requests */
2645            ctrl_ack_handler(ctrl_copy.u.ack.sender_op_id,
2646                             gm_ntohs(poll_event->recv.sender_node_id),
2647                             ctrl_copy.u.ack.remote_ptr,
2648                             ctrl_copy.u.ack.receiver_op_id);
2649            ret = 0;
2650            break;
2651        case CTRL_REQ_TYPE:
2652            /* this is a new control request from someone */
2653            ret = ctrl_req_handler_rend(ctrl_copy.u.req.sender_op_id,
2654                                            ctrl_copy.u.req.actual_size,
2655                                            ctrl_copy.u.req.msg_tag,
2656                                            gm_ntohs(poll_event->recv.
2657                                                     sender_node_id),
2658                                            gm_ntoh_u8(poll_event->recv.
2659                                                     sender_port_id));
2660            break;
2661        case CTRL_PUT_TYPE:
2662            put_recv_handler(ctrl_copy.u.put.receiver_op_id);
2663            ret = 0;
2664            break;
2665        case CTRL_IMMED_TYPE:
2666            /* try to find a matching post from the receiver so that we don't
2667             * have to buffer this yet again */
2668            map = gm_addr_search(&gm_addr_list,
2669                                 gm_ntohs(poll_event->recv.sender_node_id),
2670                                 gm_ntoh_u8(poll_event->recv.sender_port_id));
2671            if (!map)
2672            {
2673                /* TODO: handle this error better */
2674                gossip_lerr("Error: unknown sender!\n");
2675                return (bmi_gm_errno_to_pvfs(-EPROTO));
2676            }
2677
2678            memset(&key, 0, sizeof(struct op_list_search_key));
2679            key.method_addr = map;
2680            key.method_addr_yes = 1;
2681            key.msg_tag = ctrl_copy.u.immed.msg_tag;
2682            key.msg_tag_yes = 1;
2683
2684            query_op = op_list_search(op_list_array[IND_NEED_CTRL_MATCH], &key);
2685            if (!query_op)
2686            {
2687                gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Doh! Using extra buffer.\n");
2688                tmp_buffer = malloc(ctrl_copy.u.immed.actual_size);
2689                if (!tmp_buffer)
2690                {
2691                    /* TODO: handle error */
2692                    return (bmi_gm_errno_to_pvfs(-ENOMEM));
2693                }
2694                if (fast)
2695                {
2696                    memcpy(tmp_buffer, gm_ntohp(poll_event->recv.message),
2697                           ctrl_copy.u.immed.actual_size);
2698                }
2699                else
2700                {
2701                    memcpy(tmp_buffer, gm_ntohp(poll_event->recv.buffer),
2702                           ctrl_copy.u.immed.actual_size);
2703                }
2704                gm_provide_receive_buffer(local_port,
2705                                          gm_ntohp(poll_event->recv.buffer),
2706                                          GM_IMMED_SIZE, GM_HIGH_PRIORITY);
2707                ret = immed_recv_handler(ctrl_copy.u.immed.actual_size,
2708                                         ctrl_copy.u.immed.msg_tag, map,
2709                                         tmp_buffer);
2710            }
2711            else
2712            {
2713                /* found a match */
2714                gm_op_data = query_op->method_data;
2715                if(fast)
2716                    copy_buffer = gm_ntohp(poll_event->recv.message);
2717                else
2718                    copy_buffer = gm_ntohp(poll_event->recv.buffer);
2719
2720                if(gm_op_data->list_flag)
2721                {
2722                    total_copied = 0;
2723                    for(i=0; i<query_op->list_count; i++)
2724                    {
2725                        if(total_copied == ctrl_copy.u.immed.actual_size)
2726                            break;
2727
2728                        copy_size = ctrl_copy.u.immed.actual_size - total_copied;
2729                        if(copy_size > query_op->size_list[i])
2730                            copy_size = query_op->size_list[i];
2731                        memcpy(query_op->buffer_list[i], copy_buffer, copy_size);
2732                        copy_buffer = (void*)((long)copy_buffer + (long)copy_size);
2733                        total_copied += copy_size;
2734                    }
2735                }
2736                else
2737                {
2738                    memcpy(query_op->buffer, copy_buffer,
2739                           ctrl_copy.u.immed.actual_size);
2740                }
2741
2742                gm_provide_receive_buffer(local_port,
2743                                          gm_ntohp(poll_event->recv.buffer),
2744                                          GM_IMMED_SIZE, GM_HIGH_PRIORITY);
2745                op_list_remove(query_op);
2746                query_op->actual_size = ctrl_copy.u.immed.actual_size;
2747                query_op->error_code = 0;
2748                op_list_add(completion_array[query_op->context_id], query_op);
2749                gm_op_data->complete = 1;
2750                ret = 0;
2751            }
2752            break;
2753        case CTRL_UNEXP_TYPE:
2754            map = gm_addr_search(&gm_addr_list,
2755                                 gm_ntohs(poll_event->recv.sender_node_id),
2756                                 gm_ntoh_u8(poll_event->recv.sender_port_id));
2757            if (!map)
2758            {
2759                /* new address! */
2760                map = alloc_gm_method_addr();
2761                gm_addr_data = map->method_data;
2762                gm_addr_data->node_id = gm_ntohs(poll_event->recv.sender_node_id);
2763                gm_addr_data->port_id = gm_ntohc(poll_event->recv.sender_port_id);
2764                /* let the bmi layer know about it */
2765                gm_addr_data->bmi_addr = bmi_method_addr_reg_callback(map);
2766                if (!gm_addr_data->bmi_addr)
2767                {
2768                    dealloc_gm_method_addr(map);
2769                    return (-BMI_ENOMEM);
2770                }
2771                /* keep up with it ourselves also */
2772                gm_addr_add(&gm_addr_list, map);
2773            }
2774
2775            tmp_buffer = malloc(ctrl_copy.u.immed.actual_size);
2776            if (!tmp_buffer)
2777            {
2778                /* TODO: handle error */
2779                return (bmi_gm_errno_to_pvfs(-ENOMEM));
2780            }
2781            if (fast)
2782            {
2783                memcpy(tmp_buffer, gm_ntohp(poll_event->recv.message),
2784                       ctrl_copy.u.immed.actual_size);
2785            }
2786            else
2787            {
2788                memcpy(tmp_buffer, gm_ntohp(poll_event->recv.buffer),
2789                       ctrl_copy.u.immed.actual_size);
2790            }
2791            gm_provide_receive_buffer(local_port,
2792                                      gm_ntohp(poll_event->recv.buffer),
2793                                      GM_IMMED_SIZE, GM_HIGH_PRIORITY);
2794            ret =
2795                immed_unexp_recv_handler(ctrl_copy.u.immed.actual_size,
2796                    ctrl_copy.u.immed.msg_tag, map, tmp_buffer,
2797                    ctrl_copy.u.immed.class);
2798            break;
2799        default:
2800            /* TODO: handle this better */
2801            assert(0);
2802            break;
2803    }
2804
2805    return (ret);
2806}
2807
2808/* put_recv_handler()
2809 *
2810 * handles announcements from sender that a directed send has completed
2811 *
2812 * returns 0 on success, -errno on failure
2813 */
2814static void put_recv_handler(bmi_op_id_t ctrl_op_id)
2815{
2816    method_op_p query_op = NULL;
2817    struct gm_op *gm_op_data = NULL;
2818    void* copy_buffer;
2819    bmi_size_t copy_size, total_copied;
2820    int i;
2821
2822    /* find the matching operation */
2823    query_op = id_gen_fast_lookup(ctrl_op_id);
2824    if(!query_op)
2825    {
2826        /* operation must have been cancelled; just return */
2827        return;
2828    }
2829
2830    op_list_remove(query_op);
2831    gm_op_data = query_op->method_data;
2832
2833    /* let go of the buffer if we need to */
2834    if (gm_op_data->buffer_status == GM_BUF_METH_REG)
2835    {
2836        if(gm_op_data->list_flag)
2837        {
2838#ifdef ENABLE_GM_REGCACHE
2839            /* we don't handle this yet */
2840            assert(0);
2841
2842#endif /* ENABLE_GM_REGCACHE */
2843#ifdef ENABLE_GM_REGISTER
2844            /* we don't handle this yet */
2845            assert(0);
2846
2847#endif /* ENABLE_GM_REGISTER */
2848#ifdef ENABLE_GM_BUFCOPY
2849            /* we don't handle this yet */
2850            assert(0);
2851
2852#endif /* ENABLE_GM_BUFCOPY */
2853#ifdef ENABLE_GM_BUFPOOL
2854            if(!gm_op_data->cancelled)
2855            {
2856                copy_buffer = gm_op_data->tmp_xfer_buffer;
2857                total_copied = 0;
2858                for(i=0; i<query_op->list_count; i++)
2859                {
2860                    if(total_copied == query_op->actual_size)
2861                        break;
2862
2863                    copy_size = query_op->actual_size - total_copied;
2864                    if(copy_size > query_op->size_list[i])
2865                        copy_size = query_op->size_list[i];
2866
2867                    memcpy(query_op->buffer_list[i], copy_buffer, copy_size);
2868                    copy_buffer = (void*)((long)copy_buffer + (long)copy_size);
2869                    total_copied += copy_size;
2870                }
2871            }
2872            bmi_gm_bufferpool_put(io_pool, gm_op_data->tmp_xfer_buffer);
2873#endif /* ENABLE_GM_BUFPOOL */
2874        }
2875        else
2876        {
2877#ifdef ENABLE_GM_REGCACHE
2878            bmi_gm_unuse_interval((unsigned long) query_op->buffer,
2879                                  query_op->actual_size);
2880#endif /* ENABLE_GM_REGCACHE */
2881#ifdef ENABLE_GM_REGISTER
2882            gm_deregister_memory(local_port, query_op->buffer,
2883                                 query_op->actual_size);
2884#endif /* ENABLE_GM_REGISTER */
2885#ifdef ENABLE_GM_BUFCOPY
2886            memcpy(query_op->buffer, gm_op_data->tmp_xfer_buffer,
2887                   query_op->actual_size);
2888            gm_dma_free(local_port, gm_op_data->tmp_xfer_buffer);
2889#endif /* ENABLE_GM_BUFCOPY */
2890#ifdef ENABLE_GM_BUFPOOL
2891            if(!gm_op_data->cancelled)
2892            {
2893                memcpy(query_op->buffer, gm_op_data->tmp_xfer_buffer,
2894                       query_op->actual_size);
2895            }
2896            bmi_gm_bufferpool_put(io_pool, gm_op_data->tmp_xfer_buffer);
2897#endif /* ENABLE_GM_BUFPOOL */
2898        }
2899    }
2900
2901    /* if this is an operation that has been cancelled, then don't put
2902     * it in the completion queue (it should have already been announced)
2903     * Instead, just quietly gid rid of the operation
2904     */
2905    if(gm_op_data->cancelled)
2906    {
2907        cancelled_rend_count--;
2908        dealloc_gm_method_op(query_op);
2909        return;
2910    }
2911
2912    /* done */
2913    query_op->error_code = 0;
2914    op_list_add(completion_array[query_op->context_id], query_op);
2915    gm_op_data->complete = 1;
2916    return;
2917}
2918
2919
2920/* ctrl_ack_handler()
2921 *
2922 * handles control message acknowledgements (for sender)
2923 *
2924 * returns 0 on success, -errno on failure
2925 */
2926static void ctrl_ack_handler(bmi_op_id_t ctrl_op_id,
2927                             unsigned int node_id,
2928                             gm_remote_ptr_t remote_ptr,
2929                             bmi_op_id_t peer_op_id)
2930{
2931    method_op_p query_op = NULL;
2932    struct gm_op *gm_op_data = NULL;
2933    /* we got a control ack- what now?
2934     * 1) figure out which op this matches
2935     * 2) see if we have a send token
2936     *    a) if not, queue up
2937     * 3) send data
2938     */
2939
2940    /* find the matching operation */
2941    query_op = id_gen_fast_lookup(ctrl_op_id);
2942
2943    if(!query_op)
2944    {
2945        /* the op is gone; probably canceled.  Just return. */
2946        return;
2947    }
2948
2949    op_list_remove(query_op);
2950    gm_op_data = query_op->method_data;
2951    gm_op_data->remote_ptr = remote_ptr;
2952    gm_op_data->peer_op_id = peer_op_id;
2953
2954    /* make sure that we are not bypassing any operations that
2955     * are stalled waiting on tokens
2956     */
2957    if (!op_list_empty(op_list_array[IND_NEED_SEND_TOK_LOW]))
2958    {
2959        gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Stalling behind stalled message.\n");
2960        op_list_add(op_list_array[IND_NEED_SEND_TOK_LOW], query_op);
2961        return;
2962    }
2963
2964    if (!gm_alloc_send_token(local_port, GM_LOW_PRIORITY))
2965    {
2966        /* we don't have a send token */
2967        op_list_add(op_list_array[IND_NEED_SEND_TOK_LOW], query_op);
2968        return;
2969    }
2970
2971#ifdef ENABLE_GM_BUFPOOL
2972    if (io_buffers_exhausted())
2973    {
2974        gm_free_send_token(local_port, GM_LOW_PRIORITY);
2975        op_list_add(op_list_array[IND_NEED_SEND_TOK_LOW], query_op);
2976        return;
2977    }
2978#endif /* ENABLE_GM_BUFPOOL */
2979
2980    if(gm_op_data->list_flag)
2981        setup_send_data_buffer_list(query_op);
2982    else
2983        setup_send_data_buffer(query_op);
2984    send_data_buffer(query_op);
2985    return;
2986}
2987
2988
2989/* setup_send_data_buffer()
2990 *
2991 * prepares a buffer to be sent
2992 *
2993 * no return value
2994 */
2995static void setup_send_data_buffer(method_op_p mop)
2996{
2997    struct gm_op *gm_op_data = mop->method_data;
2998    bmi_size_t pinned_size = 0;
2999
3000    if (gm_op_data->buffer_status == GM_BUF_METH_REG)
3001    {
3002#ifdef ENABLE_GM_REGCACHE
3003        pinned_size = bmi_gm_use_interval((unsigned long) mop->buffer,
3004                                          mop->actual_size);
3005        if (pinned_size != mop->actual_size)
3006        {
3007            gossip_lerr
3008                ("Error: could not register memory, wanted: %d, got: %d\n",
3009                 (int) mop->actual_size, (int) pinned_size);
3010            bmi_gm_unuse_interval((unsigned long) mop->buffer, pinned_size);
3011            /* TODO: handle this better */
3012            mop->error_code = bmi_gm_errno_to_pvfs(-ENOMEM);
3013            op_list_add(completion_array[mop->context_id], mop);
3014            gm_op_data->complete = 1;
3015            return;
3016        }
3017#endif /* ENABLE_GM_REGCACHE */
3018#ifdef ENABLE_GM_REGISTER
3019        pinned_size = mop->actual_size;
3020        if (gm_register_memory(local_port, mop->buffer,
3021                               mop->actual_size) != GM_SUCCESS)
3022        {
3023            gossip_lerr("Error: could not register memory.\n");
3024            /* TODO: handle this better */
3025            mop->error_code = bmi_gm_errno_to_pvfs(-ENOMEM);
3026            op_list_add(completion_array[mop->context_id], mop);
3027            gm_op_data->complete = 1;
3028            return;
3029        }
3030#endif /* ENABLE_GM_REGISTER */
3031#ifdef ENABLE_GM_BUFCOPY
3032        pinned_size = mop->actual_size;
3033        gm_op_data->tmp_xfer_buffer = gm_dma_malloc(local_port,
3034                                                    (unsigned long) mop->
3035                                                    actual_size);
3036        if (!gm_op_data->tmp_xfer_buffer)
3037        {
3038            gossip_lerr("Error: gm_dma_malloc().\n");
3039            mop->error_code = bmi_gm_errno_to_pvfs(-ENOMEM);
3040            op_list_add(completion_array[mop->context_id], mop);
3041            gm_op_data->complete = 1;
3042            return;
3043        }
3044        memcpy(gm_op_data->tmp_xfer_buffer, mop->buffer, mop->actual_size);
3045        mop->buffer = gm_op_data->tmp_xfer_buffer;
3046#endif /* ENABLE_GM_BUFCOPY */
3047#ifdef ENABLE_GM_BUFPOOL
3048        pinned_size = mop->actual_size;
3049        gm_op_data->tmp_xfer_buffer = bmi_gm_bufferpool_get(io_pool);
3050        memcpy(gm_op_data->tmp_xfer_buffer, mop->buffer, mop->actual_size);
3051        mop->buffer = gm_op_data->tmp_xfer_buffer;
3052#endif /* ENABLE_GM_BUFPOOL */
3053
3054    }
3055
3056    return;
3057}
3058
3059
3060/* setup_send_data_buffer_list()
3061 *
3062 * prepares a buffer to be sent (for list operations)
3063 *
3064 * no return value
3065 */
3066static void setup_send_data_buffer_list(method_op_p mop)
3067{
3068    struct gm_op *gm_op_data = mop->method_data;
3069    int i;
3070    void* copy_buffer;
3071
3072#ifdef ENABLE_GM_REGCACHE
3073    /* we don't handle this yet */
3074    assert(0);
3075#endif /* ENABLE_GM_REGCACHE */
3076#ifdef ENABLE_GM_REGISTER
3077    /* we don't handle this yet */
3078    assert(0);
3079#endif /* ENABLE_GM_REGISTER */
3080#ifdef ENABLE_GM_BUFCOPY
3081    /* we don't handle this yet */
3082    assert(0);
3083#endif /* ENABLE_GM_BUFCOPY */
3084#ifdef ENABLE_GM_BUFPOOL
3085    gm_op_data->tmp_xfer_buffer = bmi_gm_bufferpool_get(io_pool);
3086    copy_buffer = gm_op_data->tmp_xfer_buffer;
3087    for(i=0; i<mop->list_count; i++)
3088    {
3089        memcpy(copy_buffer, mop->buffer_list[i], mop->size_list[i]);
3090        copy_buffer = (void*)((long)copy_buffer + (long)mop->size_list[i]);
3091    }
3092    mop->buffer = gm_op_data->tmp_xfer_buffer;
3093#endif /* ENABLE_GM_BUFPOOL */
3094
3095    return;
3096}
3097
3098/* send_data_buffer()
3099 *
3100 * sends the actual message data for an operation.  Assumes that a low
3101 * priority send token has already been acquired, and that the
3102 * send buffer has already been prepared.
3103 *
3104 * no return value
3105 */
3106static void send_data_buffer(method_op_p mop)
3107{
3108    struct gm_addr *gm_addr_data = mop->addr->method_data;
3109    struct gm_op *gm_op_data = mop->method_data;
3110
3111    /* send actual buffer */
3112    /* NOTE: the ctrl message communication leading up to this send allows
3113     * us to use the "actual size" field here rather than the expected
3114     * size when transfering data
3115     */
3116
3117    gm_directed_send_with_callback(local_port, mop->buffer,
3118                                   gm_op_data->remote_ptr,
3119                                   mop->actual_size, GM_LOW_PRIORITY,
3120                                   gm_addr_data->node_id, gm_addr_data->port_id,
3121                                   data_send_callback, mop);
3122
3123    op_list_add(op_list_array[IND_SENDING], mop);
3124
3125    return;
3126}
3127
3128
3129/* prepare_for_recv()
3130 *
3131 * provides a receive buffer and sends a control ack to allow the sender
3132 * to continue.  Assumes that the send and recv tokens have already been
3133 * acquired.
3134 *
3135 * returns 0 on success, -errno on failure
3136 */
3137static void prepare_for_recv(method_op_p mop)
3138{
3139    struct gm_addr *gm_addr_data = mop->addr->method_data;
3140    struct gm_op *gm_op_data = mop->method_data;
3141    struct ctrl_msg *new_ctrl = NULL;
3142    bmi_size_t pinned_size = 0;
3143
3144    /* prepare a ctrl message response */
3145    new_ctrl = bmi_gm_bufferpool_get(ctrl_send_pool);
3146
3147    new_ctrl->ctrl_type = CTRL_ACK_TYPE;
3148    new_ctrl->magic_nr = BMI_MAGIC_NR;
3149    new_ctrl->u.ack.sender_op_id = gm_op_data->peer_op_id;
3150    new_ctrl->u.ack.receiver_op_id = mop->op_id;
3151    /* doing this to avoid a warning about type size mismatch */
3152    new_ctrl->u.ack.remote_ptr = 0;
3153    new_ctrl->u.ack.remote_ptr += (unsigned long) mop->buffer;
3154
3155    /* keep up with this buffer in the op structure */
3156    gm_op_data->freeable_ctrl_buffer = new_ctrl;
3157
3158    /* pinn the buffer to be ready for the data transfer */
3159    if (gm_op_data->buffer_status == GM_BUF_METH_REG)
3160    {
3161#ifdef ENABLE_GM_REGCACHE
3162        pinned_size = bmi_gm_use_interval((unsigned long) mop->buffer,
3163                                          mop->actual_size);
3164        if (pinned_size != mop->actual_size)
3165        {
3166            gossip_lerr
3167                ("Error: could not register memory, wanted: %d, got: %d\n",
3168                 (int) mop->actual_size, (int) pinned_size);
3169            /* TODO: handle this error better */
3170            mop->error_code = bmi_gm_errno_to_pvfs(-ENOMEM);
3171            op_list_add(completion_array[mop->context_id], mop);
3172            gm_op_data->complete = 1;
3173            return;
3174        }
3175#endif /* ENABLE_GM_REGCACHE */
3176#ifdef ENABLE_GM_REGISTER
3177        pinned_size = mop->actual_size;
3178        if (gm_register_memory(local_port, mop->buffer,
3179                               mop->actual_size) != GM_SUCCESS)
3180        {
3181            gossip_lerr("Error: could not register memory.\n");
3182            /* TODO: handle this better */
3183            mop->error_code = bmi_gm_errno_to_pvfs(-ENOMEM);
3184            op_list_add(completion_array[mop->context_id], mop);
3185            gm_op_data->complete = 1;
3186            return;
3187        }
3188#endif /* ENABLE_GM_REGISTER */
3189#ifdef ENABLE_GM_BUFCOPY
3190        pinned_size = mop->actual_size;
3191        gm_op_data->tmp_xfer_buffer = gm_dma_malloc(local_port,
3192                                                    (unsigned long) mop->
3193                                                    actual_size);
3194        if (!gm_op_data->tmp_xfer_buffer)
3195        {
3196            gossip_lerr("Error: gm_dma_malloc().\n");
3197            /* TODO: handle this better */
3198            mop->error_code = bmi_gm_errno_to_pvfs(-ENOMEM);
3199            op_list_add(completion_array[mop->context_id], mop);
3200            gm_op_data->complete = 1;
3201            return;
3202        }
3203        new_ctrl->u.ack.remote_ptr = 0;
3204        new_ctrl->u.ack.remote_ptr +=
3205            (unsigned long) gm_op_data->tmp_xfer_buffer;
3206#endif /* ENABLE_GM_BUFCOPY */
3207#ifdef ENABLE_GM_BUFPOOL
3208        pinned_size = mop->actual_size;
3209        gm_op_data->tmp_xfer_buffer = bmi_gm_bufferpool_get(io_pool);
3210        new_ctrl->u.ack.remote_ptr = 0;
3211        new_ctrl->u.ack.remote_ptr +=
3212            (unsigned long) gm_op_data->tmp_xfer_buffer;
3213#endif /* ENABLE_GM_BUFPOOL */
3214
3215    }
3216
3217    /* send ctrl message */
3218    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Sending ctrl ack.\n");
3219    gm_send_with_callback(local_port, new_ctrl,
3220                                  GM_IMMED_SIZE, GM_CTRL_LENGTH,
3221                                  GM_HIGH_PRIORITY, gm_addr_data->node_id,
3222                                  gm_addr_data->port_id,
3223                                  ctrl_ack_callback, mop);
3224
3225    /* queue up op */
3226    op_list_add(op_list_array[IND_SENDING], mop);
3227    return;
3228
3229}
3230
3231
3232/* prepare_for_recv_list()
3233 *
3234 * provides a receive buffer and sends a control ack to allow the sender
3235 * to continue.  Assumes that the send and recv tokens have already been
3236 * acquired.
3237 *
3238 * returns 0 on success, -errno on failure
3239 */
3240static void prepare_for_recv_list(method_op_p mop)
3241{
3242    struct gm_addr *gm_addr_data = mop->addr->method_data;
3243    struct gm_op *gm_op_data = mop->method_data;
3244    struct ctrl_msg *new_ctrl = NULL;
3245    bmi_size_t pinned_size = 0;
3246
3247    /* prepare a ctrl message response */
3248    new_ctrl = bmi_gm_bufferpool_get(ctrl_send_pool);
3249
3250    new_ctrl->ctrl_type = CTRL_ACK_TYPE;
3251    new_ctrl->magic_nr = BMI_MAGIC_NR;
3252    new_ctrl->u.ack.sender_op_id = gm_op_data->peer_op_id;
3253    new_ctrl->u.ack.receiver_op_id = mop->op_id;
3254    /* doing this to avoid a warning about type size mismatch */
3255    new_ctrl->u.ack.remote_ptr = 0;
3256    new_ctrl->u.ack.remote_ptr += (unsigned long) mop->buffer;
3257
3258    /* keep up with this buffer in the op structure */
3259    gm_op_data->freeable_ctrl_buffer = new_ctrl;
3260
3261    /* pinn the buffer to be ready for the data transfer */
3262    if (gm_op_data->buffer_status == GM_BUF_METH_REG)
3263    {
3264#ifdef ENABLE_GM_REGCACHE
3265        /* we don't handle this yet */
3266        assert(0);
3267#endif /* ENABLE_GM_REGCACHE */
3268#ifdef ENABLE_GM_REGISTER
3269        /* we don't handle this yet */
3270        assert(0);
3271#endif /* ENABLE_GM_REGISTER */
3272#ifdef ENABLE_GM_BUFCOPY
3273        /* we don't handle this yet */
3274        assert(0);
3275#endif /* ENABLE_GM_BUFCOPY */
3276#ifdef ENABLE_GM_BUFPOOL
3277        pinned_size = mop->actual_size;
3278        gm_op_data->tmp_xfer_buffer = bmi_gm_bufferpool_get(io_pool);
3279        new_ctrl->u.ack.remote_ptr = 0;
3280        new_ctrl->u.ack.remote_ptr +=
3281            (unsigned long) gm_op_data->tmp_xfer_buffer;
3282#endif /* ENABLE_GM_BUFPOOL */
3283
3284    }
3285
3286    /* send ctrl message */
3287    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "Sending ctrl ack.\n");
3288    gm_send_with_callback(local_port, new_ctrl,
3289                                  GM_IMMED_SIZE, GM_CTRL_LENGTH,
3290                                  GM_HIGH_PRIORITY, gm_addr_data->node_id,
3291                                  gm_addr_data->port_id,
3292                                  ctrl_ack_callback, mop);
3293
3294    /* queue up op */
3295    op_list_add(op_list_array[IND_SENDING], mop);
3296    return;
3297}
3298
3299
3300/* ctrl_ack_callback()
3301 *
3302 * callback function triggered on the completion of a control ack
3303 * message send.
3304 *
3305 * no return value
3306 */
3307static void ctrl_ack_callback(struct gm_port *port,
3308                              void *context,
3309                              gm_status_t status)
3310{
3311    /* the context is our operation */
3312    method_op_p my_op = context;
3313    struct gm_op *gm_op_data = NULL;
3314    struct gm_addr *gm_addr_data = my_op->addr->method_data;
3315
3316    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "ctrl_ack_callback() called.\n");
3317
3318    /* give back a send token */
3319    gm_free_send_token(local_port, GM_HIGH_PRIORITY);
3320
3321    /* free up ctrl message buffer */
3322    gm_op_data = my_op->method_data;
3323    bmi_gm_bufferpool_put(ctrl_send_pool, gm_op_data->freeable_ctrl_buffer);
3324    gm_op_data->freeable_ctrl_buffer = NULL;
3325
3326    /* see if the receiver couldn't keep up */
3327    if (status == GM_SEND_TIMED_OUT)
3328    {
3329        gossip_lerr("Error: GM TIMEOUT!  Not handled.\n");
3330        op_list_remove(my_op);
3331        my_op->error_code = bmi_gm_errno_to_pvfs(-ETIMEDOUT);
3332        op_list_add(completion_array[my_op->context_id], my_op);
3333        gm_op_data->complete = 1;
3334        return;
3335    }
3336
3337    /* look for other errors */
3338    if (status != GM_SUCCESS)
3339    {
3340        gossip_lerr("Error: GM send failure, detected in callback.\n");
3341        gossip_err("Error: %s (%d)\n", gm_strerror(status), (int)status);
3342        gossip_err("Sending from %s to %s\n", gm_host_name,
3343            gm_node_id_to_host_name(local_port, gm_addr_data->node_id));
3344        op_list_remove(my_op);
3345        my_op->error_code = bmi_gm_errno_to_pvfs(-ETIMEDOUT);
3346        op_list_add(completion_array[my_op->context_id], my_op);
3347        gm_op_data->complete = 1;
3348        return;
3349    }
3350
3351    /* golden! */
3352
3353    /* we need to drive the state of this operation to the next phase, in
3354     * which we actually receive the message data
3355     */
3356    op_list_remove(my_op);
3357    op_list_add(op_list_array[IND_RECVING], my_op);
3358
3359    /* etc. etc. */
3360    return;
3361}
3362
3363
3364/* data_send_callback()
3365 *
3366 * callback function triggered on the completion of a data
3367 * message send.
3368 *
3369 * no return value
3370 */
3371static void data_send_callback(struct gm_port *port,
3372                               void *context,
3373                               gm_status_t status)
3374{
3375    /* the context is our operation */
3376    method_op_p my_op = context;
3377    struct gm_op *gm_op_data = my_op->method_data;
3378    struct gm_addr *gm_addr_data = my_op->addr->method_data;
3379
3380    /* give back a send token */
3381    gm_free_send_token(local_port, GM_LOW_PRIORITY);
3382
3383    if (gm_op_data->buffer_status == GM_BUF_METH_REG)
3384    {
3385#ifdef ENABLE_GM_REGCACHE
3386        bmi_gm_unuse_interval((unsigned long) my_op->buffer,
3387                              my_op->actual_size);
3388#endif /* ENABLE_GM_REGCACHE */
3389#ifdef ENABLE_GM_REGISTER
3390        gm_deregister_memory(local_port, my_op->buffer, my_op->actual_size);
3391#endif /* ENABLE_GM_REGISTER */
3392#ifdef ENABLE_GM_BUFCOPY
3393        gm_dma_free(local_port, my_op->buffer);
3394#endif /* ENABLE_GM_BUFCOPY */
3395#ifdef ENABLE_GM_BUFPOOL
3396        bmi_gm_bufferpool_put(io_pool, my_op->buffer);
3397#endif /* ENABLE_GM_BUFPOOL */
3398    }
3399
3400    /* see if the receiver couldn't keep up */
3401    if (status == GM_SEND_TIMED_OUT)
3402    {
3403        gossip_lerr("Error: GM TIMEOUT!  Not handled.\n");
3404        op_list_remove(my_op);
3405        my_op->error_code = bmi_gm_errno_to_pvfs(-ETIMEDOUT);
3406        op_list_add(completion_array[my_op->context_id], my_op);
3407        gm_op_data->complete = 1;
3408        return;
3409    }
3410
3411    /* look for other errors */
3412    if (status != GM_SUCCESS)
3413    {
3414        gossip_lerr("Error: GM send failure, detected in callback.\n");
3415        gossip_err("Error: GM return value: %d.\n", (int) status);
3416        gossip_err("Sending from %s to %s\n", gm_host_name,
3417            gm_node_id_to_host_name(local_port, gm_addr_data->node_id));
3418        op_list_remove(my_op);
3419        /* TODO: need generic solution to map GM codes to pvfs2 codes */
3420        if(status == GM_SEND_TARGET_NODE_UNREACHABLE)
3421            my_op->error_code = -PVFS_EHOSTUNREACH;
3422        else
3423            my_op->error_code = -PVFS_EPROTO;
3424        op_list_add(completion_array[my_op->context_id], my_op);
3425        gm_op_data->complete = 1;
3426        return;
3427    }
3428
3429    op_list_remove(my_op);
3430    /* we must now let the other side know that the data has arrived */
3431    if (gm_alloc_send_token(local_port, GM_HIGH_PRIORITY))
3432    {
3433        initiate_put_announcement(my_op);
3434    }
3435    else
3436    {
3437        /* we have to wait for a token to become available */
3438        op_list_add(op_list_array[IND_NEED_SEND_TOK_HI_PUT], my_op);
3439    }
3440    return;
3441}
3442
3443/* ctrl_req_handler_rend()
3444 *
3445 * handles incoming control message requests for rendezvous messages
3446 *
3447 * returns 0 on success, -errno on failure
3448 */
3449static int ctrl_req_handler_rend(bmi_op_id_t ctrl_op_id,
3450                                 bmi_size_t ctrl_actual_size,
3451                                 bmi_msg_tag_t ctrl_tag,
3452                                 unsigned int node_id,
3453                                 unsigned int port_id)
3454{
3455    bmi_method_addr_p map = NULL;
3456    struct gm_addr *gm_addr_data = NULL;
3457    struct gm_op *gm_op_data = NULL;
3458    method_op_p active_method_op = NULL;
3459    int ret = -1;
3460    struct op_list_search_key key;
3461
3462    gossip_ldebug(GOSSIP_BMI_DEBUG_GM, "ctrl_req_handler_rend() called.\n");
3463
3464    /* someone wants to send a buffer in rendezvous mode
3465     * - look to see if the matching receive has been posted
3466     *      1)      - if so, provide receive buffer from posted op
3467     *    - send an ack with buffer pointer
3468     *              - queue up
3469     * 2) - if not, create a method op
3470     *      - fill in info
3471     *      - queue up as need receive post
3472     */
3473
3474    map = gm_addr_search(&gm_addr_list, node_id, port_id);
3475    if (!map)
3476    {
3477        /* where did this come from?! */
3478        /* TODO: handle this error condition */
3479        gossip_lerr("Error: ctrl message from unknown host!!!\n");
3480        return (bmi_gm_errno_to_pvfs(-EPROTO));
3481    }
3482    gm_addr_data = map->method_data;
3483
3484    /* see if this operation has already begun... */
3485    memset(&key, 0, sizeof(struct op_list_search_key));
3486    key.method_addr = map;
3487    key.method_addr_yes = 1;
3488    key.msg_tag = ctrl_tag;
3489    key.msg_tag_yes = 1;
3490
3491    active_method_op = op_list_search(op_list_array[IND_NEED_CTRL_MATCH], &key);
3492    if (active_method_op)
3493    {
3494        op_list_remove(active_method_op);
3495
3496        if(ctrl_actual_size > active_method_op->expected_size)
3497        {
3498            gossip_lerr("Error: message ordering violation;\n");
3499            gossip_lerr("Error: message too large for next buffer.\n");
3500            /* TODO: handle this better */
3501            return(bmi_gm_errno_to_pvfs(-EPROTO));
3502        }
3503
3504        /* store remote side's op_id */
3505        gm_op_data = active_method_op->method_data;
3506        gm_op_data->peer_op_id = ctrl_op_id;
3507
3508        /* store actual size of transfer */
3509        active_method_op->actual_size = ctrl_actual_size;
3510
3511        /* now we need one send token to send ctrl ack */
3512        if (gm_alloc_send_token(local_port, GM_HIGH_PRIORITY))
3513        {
3514            /* got it! */
3515#ifdef ENABLE_GM_BUFPOOL
3516            if (io_buffers_exhausted())
3517            {
3518                gm_free_send_token(local_port, GM_HIGH_PRIORITY);
3519                op_list_add(op_list_array[IND_NEED_SEND_TOK_HI_CTRLACK],
3520                            active_method_op);
3521                ret = 0;
3522            }
3523            else
3524            {
3525#endif /* ENABLE_GM_BUFPOOL */
3526                if(gm_op_data->list_flag)
3527                    prepare_for_recv_list(active_method_op);
3528                else
3529                    prepare_for_recv(active_method_op);
3530                ret = 0;
3531#ifdef ENABLE_GM_BUFPOOL
3532            }
3533#endif /* ENABLE_GM_BUFPOOL */
3534        }
3535        else
3536        {
3537            /* we don't have enough tokens */
3538            op_list_add(op_list_array[IND_NEED_SEND_TOK_HI_CTRLACK],
3539                        active_method_op);
3540            ret = 0;
3541        }
3542    }
3543    else
3544    {
3545        /* post has _not_ occurred, queue up and wait for post */
3546        /* we need an op structure to keep up with this */
3547        active_method_op = alloc_gm_method_op();
3548        if (!active_method_op)
3549        {
3550            return (bmi_gm_errno_to_pvfs(-ENOMEM));
3551        }
3552        active_method_op->send_recv = BMI_RECV;
3553        active_method_op->addr = map;
3554        active_method_op->actual_size = ctrl_actual_size;
3555        /* TODO: is this the right thing to do here? */
3556        active_method_op->expected_size = 0;
3557        active_method_op->msg_tag = ctrl_tag;
3558        active_method_op->mode = GM_MODE_REND;
3559        active_method_op->context_id = -1;
3560        gm_op_data = active_method_op->method_data;
3561        gm_op_data->peer_op_id = ctrl_op_id;
3562
3563        op_list_add(op_list_array[IND_NEED_RECV_POST], active_method_op);
3564        ret = 0;
3565    }
3566    return (ret);
3567
3568}
3569
3570/* BMI_gm_cancel()
3571 *
3572 * attempt to cancel a pending bmi gm operation
3573 *
3574 * returns 0 on success, -errno on failure
3575 */
3576int BMI_gm_cancel(bmi_op_id_t id, bmi_context_id context_id)
3577{
3578    method_op_p query_op = (method_op_p)id_gen_fast_lookup(id);
3579    method_op_p tmp_op;
3580    struct gm_op *gm_op_data = query_op->method_data;
3581    struct op_list_search_key key;
3582
3583    assert(query_op);
3584
3585#ifndef ENABLE_GM_BUFPOOL
3586    /* we don't implement cancel for any other buffering mechanisms */
3587    assert(0);
3588#endif
3589
3590    gen_mutex_lock(&interface_mutex);
3591
3592    gossip_debug(GOSSIP_BMI_DEBUG_GM,
3593        "BMI_gm_cancel: send_recv: %d, complete: %d, mode: %d\n",
3594        query_op->send_recv, gm_op_data->complete, query_op->mode);
3595
3596    /* easy case: is the operation already completed? */
3597    if(gm_op_data->complete)
3598    {
3599        gossip_debug(GOSSIP_BMI_DEBUG_GM, "BMI_gm_cancel: complete case.\n");
3600        /* do nothing */
3601        gen_mutex_unlock(&interface_mutex);
3602        return(0);
3603    }
3604
3605    /* send case */
3606    if(query_op->send_recv == BMI_SEND)
3607    {
3608        /* find out if we are blocking on resource usage and haven't sent
3609         * anything yet
3610         */
3611        memset(&key, 0, sizeof(struct op_list_search_key));
3612        key.op_id = query_op->op_id;
3613        key.op_id_yes = 1;
3614        tmp_op = op_list_search(op_list_array[IND_NEED_SEND_TOK_HI_CTRL], &key);
3615        if(tmp_op)
3616        {
3617            gossip_debug(GOSSIP_BMI_DEBUG_GM,
3618                "BMI_gm_cancel: nothing sent yet.\n");
3619            /* nothing sent yet; clean up and move to comp. queue */
3620            assert(tmp_op == query_op);
3621            op_list_remove(query_op);
3622            if (gm_op_data->buffer_status == GM_BUF_METH_ALLOC)
3623            {
3624                /* this was an internally allocated buffer */
3625                gm_dma_free(local_port, query_op->buffer);
3626            }
3627            query_op->error_code = -PVFS_ECANCEL;
3628            op_list_add(completion_array[query_op->context_id], query_op);
3629            gm_op_data->complete = 1;
3630            gen_mutex_unlock(&interface_mutex);
3631            return(0);
3632        }
3633
3634        /* see if the send is literally in flight (waiting on a gm callback) */
3635        /* may be immediate mode, unexp mode, or rend mode during req or
3636         * done message
3637         */
3638        tmp_op = op_list_search(op_list_array[IND_SENDING], &key);
3639        if(tmp_op)
3640        {
3641            assert(tmp_op == query_op);
3642            if(query_op->mode == GM_MODE_IMMED ||
3643                query_op->mode == GM_MODE_UNEXP)
3644            {
3645                gossip_debug(GOSSIP_BMI_DEBUG_GM,
3646                    "BMI_gm_cancel: sending immed msg.\n");
3647                /* op is practically done; just wait for GM callback to
3648                 * trigger and transition it to completion queue */
3649                gen_mutex_unlock(&interface_mutex);
3650                return(0);
3651            }
3652            else if(gm_op_data->freeable_ctrl_buffer &&
3653                (gm_op_data->freeable_ctrl_buffer->ctrl_type == CTRL_PUT_TYPE))
3654            {
3655                /* op is practically done; just wait for GM callback to
3656                 * trigger and transition it to completion queue */
3657                gossip_debug(GOSSIP_BMI_DEBUG_GM,
3658                    "BMI_gm_cancel: sending put msg.\n");
3659                gen_mutex_unlock(&interface_mutex);
3660                return(0);
3661            }
3662            else if(gm_op_data->freeable_ctrl_buffer &&
3663                (gm_op_data->freeable_ctrl_buffer->ctrl_type == CTRL_REQ_TYPE))
3664            {
3665                /* waiting on ctrl req callback */
3666                /* mark op as being in a cancelled state and handle it in
3667                 * req callback and ack handler
3668                 */
3669                gossip_debug(GOSSIP_BMI_DEBUG_GM,
3670                    "BMI_gm_cancel: sending req msg.\n");
3671                gm_op_data->cancelled = 1;
3672                gen_mutex_unlock(&interface_mutex);
3673                return(0);
3674            }
3675            else
3676            {
3677                gossip_debug(GOSSIP_BMI_DEBUG_GM,
3678                    "BMI_gm_cancel: sending payload.\n");
3679                /* waiting on data send callback */
3680                /* nothing sane to do here except let it try to finish.  If
3681                 * the data send callback triggers with successful status,
3682                 * then we should try to announce the put so the other end
3683                 * can release the buffer.  If it errors, then we are done.
3684                 */
3685                gen_mutex_unlock(&interface_mutex);
3686                return(0);
3687            }
3688        }
3689       
3690        /* see if we are waiting on a token for a put msg */
3691        tmp_op = op_list_search(op_list_array[IND_NEED_SEND_TOK_HI_PUT], &key);
3692        if(tmp_op)
3693        {
3694            gossip_debug(GOSSIP_BMI_DEBUG_GM,
3695                "BMI_gm_cancel: waiting on put token.\n");
3696            assert(tmp_op == query_op);
3697            /* nothing to do here; operation is practically finished and we
3698             * should make a best effort to tell the receiver it can
3699             * release the buffer
3700             */
3701            gen_mutex_unlock(&interface_mutex);
3702            return(0);
3703        }
3704       
3705        /* see if we are waiting on a token for the data payload */
3706        tmp_op = op_list_search(op_list_array[IND_NEED_SEND_TOK_LOW], &key);
3707        if(tmp_op)
3708        {
3709            gossip_debug(GOSSIP_BMI_DEBUG_GM,
3710                "BMI_gm_cancel: waiting on data payload token.\n");
3711            assert(tmp_op == query_op);
3712
3713            /* no payload sent yet, clean up and get out */
3714            op_list_remove(query_op);
3715            query_op->error_code = -PVFS_ECANCEL;
3716            op_list_add(completion_array[query_op->context_id], query_op);
3717            gm_op_data->complete = 1;
3718            gen_mutex_unlock(&interface_mutex);
3719            return(0);
3720        }
3721    }
3722
3723    /* recv case */
3724    if(query_op->send_recv == BMI_RECV)
3725    {
3726        memset(&key, 0, sizeof(struct op_list_search_key));
3727        key.op_id = query_op->op_id;
3728        key.op_id_yes = 1;
3729
3730        /* easy case for recv: have we even been contacted yet? */
3731        tmp_op = op_list_search(op_list_array[IND_NEED_CTRL_MATCH], &key);
3732        if(tmp_op)
3733        {
3734            gossip_debug(GOSSIP_BMI_DEBUG_GM,
3735                "BMI_gm_cancel: nothing received yet.\n");
3736            /* nothing to do, no resources consumed */
3737            assert(tmp_op == query_op);
3738            op_list_remove(query_op);
3739            query_op->error_code = -PVFS_ECANCEL;
3740            op_list_add(completion_array[query_op->context_id], query_op);
3741            gm_op_data->complete = 1;
3742            gen_mutex_unlock(&interface_mutex);
3743            return(0);
3744        }
3745
3746        /* are we waiting on resources to send a ctrl ack? */
3747        tmp_op = op_list_search(op_list_array[IND_NEED_SEND_TOK_HI_CTRLACK], &key);
3748        if(tmp_op)
3749        {
3750            gossip_debug(GOSSIP_BMI_DEBUG_GM,
3751                "BMI_gm_cancel: nothing received yet.\n");
3752            assert(tmp_op == query_op);
3753            /* luckily no resources are consumed at this stage, just clean
3754             * up and get out
3755             */
3756            op_list_remove(query_op);
3757            query_op->error_code = -PVFS_ECANCEL;
3758            op_list_add(completion_array[query_op->context_id], query_op);
3759            gm_op_data->complete = 1;
3760            gen_mutex_unlock(&interface_mutex);
3761            return(0);
3762        }
3763
3764        /* are we actually in the process of receiving data? */
3765        tmp_op = op_list_search(op_list_array[IND_RECVING], &key);
3766        if(tmp_op)
3767        {
3768            struct timeval tv;
3769            gossip_debug(GOSSIP_BMI_DEBUG_GM,
3770                "BMI_gm_cancel: receiving data.\n");
3771            assert(tmp_op == query_op);
3772
3773            /* resources have been consumed (big recv buffer),
3774             * and we can't risk reusing it.  The sender may wake up and
3775             * perform a remote put operation
3776             */
3777            /* mark operation as cancelled, set timestamp, and move to
3778             * special queue; we may reclaim it much later
3779             */
3780            op_list_remove(query_op);
3781            gm_op_data->cancelled = 1;
3782            gettimeofday(&tv, NULL);
3783            gm_op_data->cancelled_tv_sec = tv.tv_sec;
3784            op_list_add(op_list_array[IND_CANCELLED_REND], query_op);
3785            cancelled_rend_count++;
3786
3787            /* NOTE: test functions will find this special case operation
3788             * and report it as completed with PVFS_ECANCEL error code so
3789             * that the caller doesn't get hung.  The caller can continue
3790             * without any risk at this point. */
3791           
3792            gen_mutex_unlock(&interface_mutex);
3793            return(0);
3794        }
3795    }
3796
3797    /* if we fall through to here then something has gone terribly wrong,
3798     * we lost track of the operation or something
3799     */
3800    assert(0);
3801
3802    gen_mutex_unlock(&interface_mutex);
3803
3804    return(0);
3805}
3806
3807static void reclaim_cancelled_io_buffers(void)
3808{
3809    struct timeval tv;
3810    method_op_p query_op = NULL;
3811    struct gm_op *gm_op_data = NULL;
3812
3813    gettimeofday(&tv, NULL);
3814
3815    if(cancelled_rend_count)
3816    {
3817        while((query_op = op_list_shownext(op_list_array[IND_CANCELLED_REND])))
3818        {
3819            gm_op_data = query_op->method_data;
3820            if((tv.tv_sec - gm_op_data->cancelled_tv_sec) <
3821                PINT_CANCELLED_REND_RECLAIM_TIMEOUT)
3822            {
3823                /* these are too recent; move on */
3824                break;
3825            }
3826
3827            /* we want to put this one back into service */
3828            /* let put recv handler do the work as if we had gotten msg
3829             * from sender
3830             */
3831            put_recv_handler(query_op->op_id);
3832            cancelled_rend_count--;
3833        }
3834
3835        /* if after doing this we still have too many exhausted buffers, then
3836         * just give up - we did the best we could...
3837         */
3838        if((io_pool->num_buffers - cancelled_rend_count) < 4)
3839        {
3840            gossip_lerr(
3841                "Error: BMI_GM: exhausted memory buffers due to cancellation.\n");
3842            assert(0);
3843        }
3844    }
3845
3846    return;
3847}
3848
3849static int io_buffers_exhausted(void)
3850{
3851    if(!bmi_gm_bufferpool_empty(io_pool))
3852    {
3853        return(0);
3854    }
3855
3856    /* try to get some cancelled buffers back */
3857    reclaim_cancelled_io_buffers();
3858
3859    if(!bmi_gm_bufferpool_empty(io_pool))
3860    {
3861        return(0);
3862    }
3863    else
3864    {
3865        return(1);
3866    }
3867}
3868
3869/*
3870 * Local variables:
3871 *  c-indent-level: 4
3872 *  c-basic-offset: 4
3873 * End:
3874 *
3875 * vim: ts=8 sts=4 sw=4 expandtab
3876 */
Note: See TracBrowser for help on using the browser.