Show
Ignore:
Timestamp:
06/18/10 20:02:50 (3 years ago)
Author:
nlmills
Message:

initial merge with Orange-Branch. much will be broken

Location:
branches/cu-security-branch
Files:
23 modified

Legend:

Unmodified
Added
Removed
  • branches/cu-security-branch

    • Property svn:ignore
      •  

        old new  
        33aclocal.m4 
        44autom4te.cache 
        5 config.status 
        6 Makefile 
        7 pvfs2-config.h 
        8 module.mk 
  • branches/cu-security-branch/src/io/bmi

    • Property svn:ignore deleted
  • branches/cu-security-branch/src/io/bmi/bmi-method-callback.h

    r8330 r8397  
    1212BMI_addr_t bmi_method_addr_reg_callback(bmi_method_addr_p map); 
    1313int bmi_method_addr_forget_callback(BMI_addr_t addr); 
     14void bmi_method_addr_drop_callback(char *method_name); 
    1415 
    1516#endif /* __BMI_METHOD_CALLBACK_H */ 
  • branches/cu-security-branch/src/io/bmi/bmi-method-support.h

    r7941 r8397  
    5959}; 
    6060 
     61/* flags that can be set per method to affect behavior */ 
     62#define BMI_METHOD_FLAG_NO_POLLING 1 
     63 
    6164/* This is the table of interface functions that must be provided by BMI 
    6265 * methods. 
     
    6568{ 
    6669    const char *method_name; 
     70    int flags; 
    6771    int (*initialize) (bmi_method_addr_p, int, int); 
    6872    int (*finalize) (void); 
  • branches/cu-security-branch/src/io/bmi/bmi.c

    r8330 r8397  
    5050    struct qlist_head link; 
    5151    BMI_addr_t addr; 
     52}; 
     53 
     54/* 
     55 * BMI trigger to reap all method resources for inactive addresses. 
     56 */ 
     57static QLIST_HEAD(bmi_addr_force_drop_list); 
     58static gen_mutex_t bmi_addr_force_drop_list_mutex = GEN_MUTEX_INITIALIZER; 
     59struct drop_item 
     60{ 
     61    struct qlist_head link; 
     62    char  *method_name; 
    5263}; 
    5364 
     
    7081#ifdef __STATIC_METHOD_BMI_PORTALS__ 
    7182extern struct bmi_method_ops bmi_portals_ops; 
     83#endif 
     84#ifdef __STATIC_METHOD_BMI_ZOID__ 
     85extern struct bmi_method_ops bmi_zoid_ops; 
    7286#endif 
    7387 
     
    88102    &bmi_portals_ops, 
    89103#endif 
     104#ifdef __STATIC_METHOD_BMI_ZOID__ 
     105    &bmi_zoid_ops, 
     106#endif 
    90107    NULL 
    91108}; 
     
    110127 
    111128static struct bmi_method_ops **active_method_table = NULL; 
    112 static struct { 
     129 
     130struct method_usage_t { 
    113131    int iters_polled;  /* how many iterations since this method was polled */ 
    114132    int iters_active;  /* how many iterations since this method had action */ 
    115133    int plan; 
    116 } *method_usage = NULL; 
     134    int flags; 
     135}; 
     136 
     137static struct method_usage_t * expected_method_usage = NULL; 
     138static struct method_usage_t * unexpected_method_usage = NULL; 
     139 
    117140static const int usage_iters_starvation = 100000; 
    118141static const int usage_iters_active = 10000; 
     
    122145    int flags); 
    123146static void bmi_addr_drop(ref_st_p tmp_ref); 
     147static void bmi_addr_force_drop(ref_st_p ref, ref_list_p ref_list); 
    124148static void bmi_check_forget_list(void); 
     149static void bmi_check_addr_force_drop (void); 
    125150 
    126151/** Initializes the BMI layer.  Must be called before any other BMI 
     
    518543    known_method_count = 0; 
    519544 
    520     if (method_usage) 
    521         free(method_usage); 
     545    if (expected_method_usage) 
     546        free(expected_method_usage); 
     547 
     548    if (unexpected_method_usage) 
     549       free(unexpected_method_usage); 
    522550 
    523551    /* destroy the reference list */ 
     
    901929 */ 
    902930static void 
    903 construct_poll_plan(int nmeth, int *idle_time_ms) 
     931construct_poll_plan(struct method_usage_t * method_usage, 
     932      int nmeth, int *idle_time_ms) 
    904933{ 
    905934    int i, numplan; 
     
    910939        ++method_usage[i].iters_active; 
    911940        method_usage[i].plan = 0; 
    912         if (method_usage[i].iters_active <= usage_iters_active) { 
     941        if ((method_usage[i].iters_active <= usage_iters_active) && 
     942            (!(method_usage[i].flags & BMI_METHOD_FLAG_NO_POLLING))){ 
    913943            /* recently busy, poll */ 
    914944            if (0) gossip_debug(GOSSIP_BMI_DEBUG_CONTROL, 
     
    967997    /* figure out if we need to drop any stale addresses */ 
    968998    bmi_check_forget_list(); 
     999    bmi_check_addr_force_drop(); 
    9691000 
    9701001    gen_mutex_lock(&active_method_count_mutex); 
     
    9771008    *outcount = 0; 
    9781009 
    979     construct_poll_plan(tmp_active_method_count, &max_idle_time_ms); 
     1010    construct_poll_plan(unexpected_method_usage, 
     1011          tmp_active_method_count, &max_idle_time_ms); 
    9801012 
    9811013    while (position < incount && i < tmp_active_method_count) 
    9821014    { 
    983         if (method_usage[i].plan) { 
     1015        if (unexpected_method_usage[i].plan) { 
    9841016            ret = active_method_table[i]->testunexpected( 
    9851017                (incount - position), &tmp_outcount, 
     
    9931025            position += tmp_outcount; 
    9941026            (*outcount) += tmp_outcount; 
    995             method_usage[i].iters_polled = 0; 
     1027            unexpected_method_usage[i].iters_polled = 0; 
    9961028            if (ret) 
    997                 method_usage[i].iters_active = 0; 
     1029                unexpected_method_usage[i].iters_active = 0; 
    9981030        } 
    9991031        i++; 
     
    10741106    } 
    10751107 
    1076     construct_poll_plan(tmp_active_method_count, &max_idle_time_ms); 
     1108    construct_poll_plan(expected_method_usage, 
     1109          tmp_active_method_count, &max_idle_time_ms); 
    10771110 
    10781111    while (position < incount && i < tmp_active_method_count) 
    10791112    { 
    1080         if (method_usage[i].plan) { 
     1113        if (expected_method_usage[i].plan) { 
    10811114            ret = active_method_table[i]->testcontext( 
    10821115                incount - position,  
     
    10961129            position += tmp_outcount; 
    10971130            (*outcount) += tmp_outcount; 
    1098             method_usage[i].iters_polled = 0; 
     1131            expected_method_usage[i].iters_polled = 0; 
    10991132            if (ret) 
    1100                 method_usage[i].iters_active = 0; 
     1133                expected_method_usage[i].iters_active = 0; 
    11011134        } 
    11021135        i++; 
     
    12021235    new_buffer = tmp_ref->interface->memalloc(size, send_recv); 
    12031236 
     1237    /* initialize buffer, if not NULL. */ 
     1238    if (new_buffer) 
     1239    { 
     1240       memset(new_buffer,0,size); 
     1241    } 
    12041242    return (new_buffer); 
    12051243} 
     
    19812019 
    19822020/* 
     2021 * Signal BMI to drop inactive connections for this method. 
     2022 */ 
     2023void bmi_method_addr_drop_callback (char* method_name) 
     2024{ 
     2025    struct drop_item *item = 
     2026        (struct drop_item *) malloc(sizeof(struct drop_item)); 
     2027 
     2028    /* 
     2029     * If we can't allocate, just return. 
     2030     * Maybe this will succeed next time. 
     2031     */ 
     2032    if (!item) return; 
     2033 
     2034    item->method_name = method_name; 
     2035     
     2036    gen_mutex_lock(&bmi_addr_force_drop_list_mutex); 
     2037    qlist_add(&item->link, &bmi_addr_force_drop_list); 
     2038    gen_mutex_unlock(&bmi_addr_force_drop_list_mutex); 
     2039 
     2040    return; 
     2041} 
     2042 
     2043 
     2044/** 
     2045 * Try to increase method_usage_t struct to include room for a new method. 
     2046 */ 
     2047static int grow_method_usage (struct method_usage_t ** p, int newflags) 
     2048{ 
     2049    struct method_usage_t * x = *p; 
     2050    *p = malloc((active_method_count + 1) * sizeof(**p)); 
     2051    if (!*p) { 
     2052        *p = x; 
     2053        return 0; 
     2054    } 
     2055    if (active_method_count) { 
     2056        memcpy(*p, x, active_method_count * sizeof(**p)); 
     2057        free(x); 
     2058    } 
     2059    memset(&((*p)[active_method_count]), 0, sizeof(**p)); 
     2060    (*p)[active_method_count].flags = newflags; 
     2061 
     2062    return 1; 
     2063 } 
     2064 
     2065/* 
    19832066 * Attempt to insert this name into the list of active methods, 
    19842067 * and bring it up. 
     
    20302113    active_method_table[active_method_count] = meth; 
    20312114 
    2032     x = method_usage; 
    2033     method_usage = malloc((active_method_count + 1) * sizeof(*method_usage)); 
    2034     if (!method_usage) { 
    2035         method_usage = x; 
    2036         return -ENOMEM; 
    2037     } 
    2038     if (active_method_count) { 
    2039         memcpy(method_usage, x, active_method_count * sizeof(*method_usage)); 
    2040         free(x); 
    2041     } 
    2042     memset(&method_usage[active_method_count], 0, sizeof(*method_usage)); 
     2115    if (!grow_method_usage (&unexpected_method_usage, meth->flags)) 
     2116       return -ENOMEM; 
     2117 
     2118    /** 
     2119     * If we run out of memory here, the unexpected_method_usage will be 
     2120     * larger than strictly required but there is no memory leak. 
     2121     */ 
     2122 
     2123    if (!grow_method_usage (&expected_method_usage, meth->flags)) 
     2124       return -ENOMEM; 
    20432125 
    20442126    ++active_method_count; 
     
    22202302} 
    22212303 
     2304 
     2305/* bmi_addr_force_drop 
     2306 * 
     2307 * Destroys a complete BMI address, including forcing the method to clean up  
     2308 * its portion. 
     2309 * 
     2310 * NOTE: must be called with ref list mutex held  
     2311 */ 
     2312static void bmi_addr_force_drop(ref_st_p ref, ref_list_p ref_list) 
     2313{ 
     2314    gossip_debug(GOSSIP_BMI_DEBUG_CONTROL, 
     2315                 "[BMI CONTROL]: %s: bmi discarding address: %llu\n", 
     2316                 __func__, llu(ref->bmi_addr)); 
     2317 
     2318    ref_list_rem(ref_list, ref->bmi_addr); 
     2319    dealloc_ref_st(ref); 
     2320 
     2321    return; 
     2322} 
     2323 
     2324/* 
     2325 * bmi_check_addr_force_drop 
     2326 * 
     2327 * Checks to see if any method has requested freeing resources. 
     2328 */ 
     2329static void bmi_check_addr_force_drop (void) 
     2330{ 
     2331    struct drop_item *drop_item = NULL; 
     2332    ref_st_p          ref_item = NULL; 
     2333 
     2334    gen_mutex_lock(&bmi_addr_force_drop_list_mutex); 
     2335    while (!qlist_empty(&bmi_addr_force_drop_list)) 
     2336    { 
     2337        drop_item = qlist_entry(qlist_pop(&bmi_addr_force_drop_list), 
     2338                                struct drop_item, 
     2339                                link); 
     2340        gen_mutex_unlock(&bmi_addr_force_drop_list_mutex); 
     2341        gen_mutex_lock(&ref_mutex); 
     2342        qlist_for_each_entry(ref_item, cur_ref_list, list_link) 
     2343        { 
     2344             if ((ref_item->ref_count == 0) && 
     2345                 (ref_item->interface->method_name == drop_item->method_name)) 
     2346             { 
     2347                 bmi_addr_force_drop(ref_item, cur_ref_list); 
     2348             } 
     2349        } 
     2350        gen_mutex_unlock(&ref_mutex); 
     2351        gen_mutex_lock(&bmi_addr_force_drop_list_mutex); 
     2352    } 
     2353    gen_mutex_unlock(&bmi_addr_force_drop_list_mutex); 
     2354 
     2355    return; 
     2356} 
     2357 
    22222358/* 
    22232359 * Local variables: 
  • branches/cu-security-branch/src/io/bmi/bmi_gm

    • Property svn:ignore deleted
  • branches/cu-security-branch/src/io/bmi/bmi_gm/bmi-gm.c

    r7941 r8397  
    158158const struct bmi_method_ops bmi_gm_ops = { 
    159159    .method_name = BMI_gm_method_name, 
     160    .flags = 0, 
    160161    .initialize = BMI_gm_initialize, 
    161162    .finalize = BMI_gm_finalize, 
  • branches/cu-security-branch/src/io/bmi/bmi_ib

    • Property svn:ignore deleted
  • branches/cu-security-branch/src/io/bmi/bmi_ib/ib.c

    r7941 r8397  
    2323#include <src/common/gen-locks/gen-locks.h>  /* gen_mutex_t ... */ 
    2424#include <src/common/misc/pvfs2-internal.h> 
     25#include "pint-hint.h" 
    2526 
    2627#ifdef HAVE_VALGRIND_H 
     
    895896                 const void *buffer, bmi_size_t total_size, 
    896897                 enum bmi_buffer_type buffer_flag __unused, 
    897                  bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id) 
     898                 bmi_msg_tag_t tag, void *user_ptr, bmi_context_id 
     899                 context_id, PVFS_hint hints __unused) 
    898900{ 
    899901    return post_send(id, remote_map, 0, &buffer, &total_size, 
     
    905907  const void *const *buffers, const bmi_size_t *sizes, int list_count, 
    906908  bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused, 
    907   bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id) 
     909  bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id, PVFS_hint 
     910  hints __unused) 
    908911{ 
    909912    return post_send(id, remote_map, list_count, buffers, sizes, 
     
    916919                           enum bmi_buffer_type buffer_flag __unused, 
    917920                           bmi_msg_tag_t tag, void *user_ptr, 
    918                            bmi_context_id context_id) 
     921                           bmi_context_id context_id, PVFS_hint hints 
     922                           __unused) 
    919923{ 
    920924    return post_send(id, remote_map, 0, &buffer, &total_size, 
     
    929933                                enum bmi_buffer_type buffer_flag __unused, 
    930934                                bmi_msg_tag_t tag, void *user_ptr, 
    931                                 bmi_context_id context_id) 
     935                                bmi_context_id context_id, PVFS_hint hints 
     936                                __unused) 
    932937{ 
    933938    return post_send(id, remote_map, list_count, buffers, sizes, 
     
    10661071  void *buffer, bmi_size_t expected_len, bmi_size_t *actual_len __unused, 
    10671072  enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr, 
    1068   bmi_context_id context_id) 
     1073  bmi_context_id context_id, PVFS_hint hints __unused) 
    10691074{ 
    10701075    return post_recv(id, remote_map, 0, &buffer, &expected_len, 
     
    10771082  bmi_size_t tot_expected_len, bmi_size_t *tot_actual_len __unused, 
    10781083  enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr, 
    1079   bmi_context_id context_id) 
     1084  bmi_context_id context_id, PVFS_hint hints __unused) 
    10801085{ 
    10811086    return post_recv(id, remote_map, list_count, buffers, sizes, 
     
    21092114{ 
    21102115    .method_name = "bmi_ib", 
     2116    .flags = 0, 
    21112117    .initialize = BMI_ib_initialize, 
    21122118    .finalize = BMI_ib_finalize, 
  • branches/cu-security-branch/src/io/bmi/bmi_mx

    • Property svn:ignore deleted
  • branches/cu-security-branch/src/io/bmi/bmi_mx/README

    r7024 r8397  
    6868    BMX_DEBUG                   Turn on gossip messages 
    6969    BMX_MEM_ACCT                Track memory usage 
    70     BMX_LOGGING                 Turn on MPE logging 
    7170    BMX_SERVER_RXS              Additional rxs for servers 
    7271    BMX_TIMEOUT                 Timeout for all MX messages 
     
    117116BMI_mx_method_addr_lookup(). 
    118117 
    119 BMX_LOGGING 
    120 This is not generally recommended. It turns on support for MPE logging but 
    121 it requires modifying the Makefile.in script and re-generating configure. 
    122 Contact help <at> myri.com for assistance. 
    123  
    124118BMX_SERVER_RXS 
    125119The server will receive messages from unknown peers. This value determines how 
     
    156150mx://hostname:ep_id 
    157151 
    158 Use the first option if mx_info lists hostname:board and use the second option if mx_info simply shows a hostname. 
     152Use the first option if mx_info lists hostname:board and use the second option 
     153if mx_info simply shows a hostname. 
    159154 
    160155====================== 
  • branches/cu-security-branch/src/io/bmi/bmi_mx/mx.c

    r7941 r8397  
    1717struct bmx_data *bmi_mx = NULL; /* global state for bmi_mx */ 
    1818 
     19mx_status_t     BMX_NO_STATUS; 
     20 
    1921#if BMX_MEM_ACCT 
    2022uint64_t        mem_used = 0;   /* bytes used */ 
     
    2224#endif 
    2325 
    24 #if BMX_LOGGING 
    25 int     send_start; 
    26 int     send_finish; 
    27 int     recv_start; 
    28 int     recv_finish; 
    29 int     sendunex_start; 
    30 int     sendunex_finish; 
    31 int     recvunex_start; 
    32 int     recvunex_finish; 
    33 #endif 
    34  
    3526/* statics for event logging */ 
    3627static PINT_event_type bmi_mx_send_event_id; 
     
    4839static void 
    4940bmx_create_peername(void); 
     41 
     42/**** Completion function token handling ****************************/ 
     43/* We should not hold any locks when calling mx_test[_any](), 
     44 * mx_wait_any() or mx_cancel(). We want to avoid races between them, 
     45 * however. So, before calling any completion function, the caller 
     46 * must hold this token.  These functions implement a token system (i.e. 
     47 * semaphore) that will wake up mx_wait_any() to reduce blocking times 
     48 * for the calling function. 
     49 */ 
     50 
     51static void 
     52bmx_get_completion_token(void) 
     53{ 
     54        int     done    = 0; 
     55 
     56        do { 
     57                gen_mutex_lock(&bmi_mx->bmx_completion_lock); 
     58                if (bmi_mx->bmx_refcount == 1) { 
     59                        bmi_mx->bmx_refcount--; 
     60                        done = 1; 
     61                        gen_mutex_unlock(&bmi_mx->bmx_completion_lock); 
     62                } else { 
     63                        assert(bmi_mx->bmx_refcount == 0); 
     64                        /* someone has the lock, wake the MX endpoint in 
     65                         * case they are blocking in mx_wait_any() */ 
     66                        gen_mutex_unlock(&bmi_mx->bmx_completion_lock); 
     67                        mx_wakeup(bmi_mx->bmx_ep); 
     68                } 
     69        } while (!done); 
     70 
     71        return; 
     72} 
     73 
     74static void 
     75bmx_release_completion_token(void) 
     76{ 
     77        gen_mutex_lock(&bmi_mx->bmx_completion_lock); 
     78        bmi_mx->bmx_refcount++; 
     79        assert(bmi_mx->bmx_refcount == 1); 
     80        gen_mutex_unlock(&bmi_mx->bmx_completion_lock); 
     81        return; 
     82} 
    5083 
    5184/**** TX/RX handling functions **************************************/ 
     
    224257} 
    225258 
    226 /* remove from peer's queued txs/rxs list */ 
    227 static void 
    228 bmx_deq_ctx(struct bmx_ctx *ctx) 
    229 { 
    230         struct bmx_peer *peer = ctx->mxc_peer; 
    231  
    232         BMX_ENTER; 
    233         if (!qlist_empty(&ctx->mxc_list)) { 
    234                 gen_mutex_lock(&peer->mxp_lock); 
    235                 qlist_del_init(&ctx->mxc_list); 
    236                 gen_mutex_unlock(&peer->mxp_lock); 
    237         } 
    238         BMX_EXIT; 
    239         return; 
    240 } 
    241  
    242259/* add to peer's pending rxs list */ 
    243260static void 
     
    280297} 
    281298 
    282 /* queue on unexpected rx or tx list */ 
     299/* dequeue from unexpected rx list */ 
    283300static void 
    284 bmx_q_unex_ctx(struct bmx_ctx *ctx) 
    285 { 
     301bmx_deq_unex_rx(struct bmx_ctx **rxp) 
     302{ 
     303        struct bmx_ctx  *rx     = NULL; 
     304        list_t          *list   = &bmi_mx->bmx_unex_rxs; 
     305 
    286306        BMX_ENTER; 
    287         if (ctx->mxc_type == BMX_REQ_RX) { 
    288                 gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock); 
    289                 qlist_add_tail(&ctx->mxc_list, &bmi_mx->bmx_unex_rxs); 
    290                 gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock); 
    291         } else { 
    292                 gen_mutex_lock(&bmi_mx->bmx_unex_txs_lock); 
    293                 qlist_add_tail(&ctx->mxc_list, &bmi_mx->bmx_unex_txs); 
    294                 gen_mutex_unlock(&bmi_mx->bmx_unex_txs_lock); 
    295         } 
     307        gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock); 
     308        if (!qlist_empty(list)) { 
     309                rx = qlist_entry(list->next, struct bmx_ctx, mxc_list); 
     310                qlist_del_init(&rx->mxc_list); 
     311        } 
     312        gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock); 
     313        *rxp = rx; 
     314 
    296315        BMX_EXIT; 
    297316        return; 
    298317} 
    299318 
    300 /* add to the global canceled list */ 
     319/* add to the completion queue for the appropriate context */ 
    301320static void 
    302 bmx_q_canceled_ctx(struct bmx_ctx *ctx, bmi_error_code_t error) 
    303 { 
     321bmx_q_completed(struct bmx_ctx *ctx, enum bmx_ctx_state state, 
     322                mx_status_t status, bmi_error_code_t error) 
     323{ 
     324        int             id      = 0; 
     325        gen_mutex_t     *lock   = NULL; 
     326        list_t          *list   = NULL; 
     327 
    304328        BMX_ENTER; 
    305         ctx->mxc_state = BMX_CTX_CANCELED; 
    306         if (error < 0) 
    307                 ctx->mxc_mxstat.code = error; 
    308         else 
    309                 ctx->mxc_mxstat.code = -error; 
    310         gen_mutex_lock(&bmi_mx->bmx_canceled_lock); 
    311         qlist_add_tail(&ctx->mxc_list, &bmi_mx->bmx_canceled); 
    312         gen_mutex_unlock(&bmi_mx->bmx_canceled_lock); 
     329 
     330        ctx->mxc_state = state; 
     331        ctx->mxc_mxstat = status; 
     332        ctx->mxc_error = error < 0 ? error : -error; 
     333 
     334        if (ctx->mxc_type == BMX_REQ_RX && 
     335            ctx->mxc_msg_type == BMX_MSG_UNEXPECTED) { 
     336                list = &bmi_mx->bmx_unex_rxs; 
     337                lock = &bmi_mx->bmx_unex_rxs_lock; 
     338        } else { 
     339                id = (int) ctx->mxc_mop->context_id; 
     340                lock = &bmi_mx->bmx_done_q_lock[id]; 
     341                list = &bmi_mx->bmx_done_q[id]; 
     342        } 
     343 
     344 
     345        gen_mutex_lock(lock); 
     346        qlist_add_tail(&ctx->mxc_list, list); 
     347        gen_mutex_unlock(lock); 
     348        BMX_EXIT; 
     349        return; 
     350} 
     351 
     352static void 
     353bmx_deq_completed(struct bmx_ctx **ctxp, bmi_context_id context_id) 
     354{ 
     355        int             id      = (int) context_id; 
     356        list_t          *list   = &bmi_mx->bmx_done_q[id]; 
     357        gen_mutex_t     *lock   = &bmi_mx->bmx_done_q_lock[id]; 
     358        struct bmx_ctx  *ctx    = NULL; 
     359 
     360        BMX_ENTER; 
     361 
     362        gen_mutex_lock(lock); 
     363        if (!qlist_empty(list)) { 
     364                ctx = qlist_entry(list->next, struct bmx_ctx, mxc_list); 
     365                qlist_del_init(&ctx->mxc_list); 
     366        } 
     367        gen_mutex_unlock(lock); 
     368        *ctxp = ctx; 
     369 
    313370        BMX_EXIT; 
    314371        return; 
     
    347404 
    348405static void 
    349 bmx_put_idle_rx(struct bmx_ctx *rx) 
    350 { 
    351         if (rx == NULL) { 
    352                 debug(BMX_DB_WARN, "put_idle_rx() called with NULL"); 
     406bmx_put_idle_ctx(struct bmx_ctx *ctx) 
     407{ 
     408        list_t          *list   = &bmi_mx->bmx_idle_txs; 
     409        gen_mutex_t     *lock   = &bmi_mx->bmx_idle_txs_lock; 
     410 
     411        if (ctx == NULL) { 
     412                debug(BMX_DB_WARN, "put_idle_ctx() called with NULL"); 
    353413                return; 
    354414        } 
    355         if (rx->mxc_type != BMX_REQ_RX) { 
    356                 debug(BMX_DB_WARN, "put_idle_rx() called with a TX"); 
    357                 return; 
    358         } 
    359         if (rx->mxc_get != rx->mxc_put + 1) { 
    360                 debug(BMX_DB_ERR, "put_idle_rx() get (%llu) != put (%llu) + 1", 
    361                          (unsigned long long) rx->mxc_get, 
    362                          (unsigned long long) rx->mxc_put); 
     415        ctx->mxc_put++; 
     416        if (ctx->mxc_get != ctx->mxc_put) { 
     417                debug(BMX_DB_ERR, "put_idle_ctx() get (%llu) != put (%llu)", 
     418                         (unsigned long long) ctx->mxc_get, 
     419                         (unsigned long long) ctx->mxc_put); 
    363420                exit(1); 
    364421        } 
    365         bmx_ctx_init(rx); 
    366         rx->mxc_put++; 
    367         gen_mutex_lock(&bmi_mx->bmx_idle_rxs_lock); 
    368         qlist_add(&rx->mxc_list, &bmi_mx->bmx_idle_rxs); 
    369         gen_mutex_unlock(&bmi_mx->bmx_idle_rxs_lock); 
     422        bmx_ctx_init(ctx); 
     423 
     424        if (ctx->mxc_type == BMX_REQ_RX) { 
     425                list   = &bmi_mx->bmx_idle_rxs; 
     426                lock   = &bmi_mx->bmx_idle_rxs_lock; 
     427        } 
     428 
     429        gen_mutex_lock(lock); 
     430        qlist_add(&ctx->mxc_list, list); 
     431        gen_mutex_unlock(lock); 
    370432        return; 
    371433} 
     
    416478 
    417479        return tx; 
    418 } 
    419  
    420 static void 
    421 bmx_put_idle_tx(struct bmx_ctx *tx) 
    422 { 
    423         if (tx == NULL) { 
    424                 debug(BMX_DB_WARN, "put_idle_tx() called with NULL"); 
    425                 return; 
    426         } 
    427         if (tx->mxc_type != BMX_REQ_TX) { 
    428                 debug(BMX_DB_WARN, "put_idle_tx() called with a TX"); 
    429                 return; 
    430         } 
    431         if (tx->mxc_get != tx->mxc_put + 1) { 
    432                 debug(BMX_DB_ERR, "put_idle_tx() get (%llu) != put (%llu) + 1", 
    433                          (unsigned long long) tx->mxc_get, 
    434                          (unsigned long long) tx->mxc_put); 
    435                 exit(1); 
    436         } 
    437         bmx_ctx_init(tx); 
    438         tx->mxc_put++; 
    439         gen_mutex_lock(&bmi_mx->bmx_idle_txs_lock); 
    440         qlist_add(&tx->mxc_list, &bmi_mx->bmx_idle_txs); 
    441         gen_mutex_unlock(&bmi_mx->bmx_idle_txs_lock); 
    442         return; 
    443480} 
    444481 
     
    759796                        return ret; 
    760797                } 
    761                 bmx_put_idle_rx(rx); 
     798                bmx_put_idle_ctx(rx); 
    762799        } 
    763800 
     
    810847bmx_globals_init(int method_id) 
    811848{ 
     849        int     i       = 0; 
     850 
    812851#if BMX_MEM_ACCT 
    813852        mem_used = 0; 
     
    840879        gen_mutex_init(&bmi_mx->bmx_idle_rxs_lock); 
    841880 
    842         INIT_QLIST_HEAD(&bmi_mx->bmx_canceled); 
    843         gen_mutex_init(&bmi_mx->bmx_canceled_lock); 
    844  
    845         INIT_QLIST_HEAD(&bmi_mx->bmx_unex_txs); 
    846         gen_mutex_init(&bmi_mx->bmx_unex_txs_lock); 
     881        gen_mutex_init(&bmi_mx->bmx_completion_lock); 
     882        /* set to 1 to allow testing to start */ 
     883        bmi_mx->bmx_refcount = 1; 
     884 
     885        for (i = 0; i < BMI_MAX_CONTEXTS; i++) { 
     886                INIT_QLIST_HEAD(&bmi_mx->bmx_done_q[i]); 
     887                gen_mutex_init(&bmi_mx->bmx_done_q_lock[i]); 
     888        } 
     889 
    847890        INIT_QLIST_HEAD(&bmi_mx->bmx_unex_rxs); 
    848891        gen_mutex_init(&bmi_mx->bmx_unex_rxs_lock); 
     
    883926        param.val.context_id.shift = BMX_MSG_SHIFT; 
    884927 
    885         if (board == -1) board = 0; 
    886928        mxret = mx_open_endpoint(board, ep_id, BMX_MAGIC, 
    887929                                 &param, 1, ep); 
     
    922964 
    923965        BMX_ENTER; 
    924  
    925 #if BMX_LOGGING 
    926         MPE_Init_log(); 
    927 #define BMX_LOG_STATE 1 
    928 #if BMX_LOG_STATE 
    929         send_start              = MPE_Log_get_event_number(); 
    930         send_finish             = MPE_Log_get_event_number(); 
    931         recv_start              = MPE_Log_get_event_number(); 
    932         recv_finish             = MPE_Log_get_event_number(); 
    933         sendunex_start          = MPE_Log_get_event_number(); 
    934         sendunex_finish         = MPE_Log_get_event_number(); 
    935         recvunex_start          = MPE_Log_get_event_number(); 
    936         recvunex_finish         = MPE_Log_get_event_number(); 
    937         MPE_Describe_state(send_start, send_finish, "Send", "red"); 
    938         MPE_Describe_state(recv_start, recv_finish, "Recv", "blue"); 
    939         MPE_Describe_state(sendunex_start, sendunex_finish, "SendUnex", "orange"); 
    940         MPE_Describe_state(recvunex_start, recvunex_finish, "RecvUnex", "green"); 
    941 #else 
    942         MPE_Log_get_solo_eventID(&send_start); 
    943         MPE_Log_get_solo_eventID(&send_finish); 
    944         MPE_Log_get_solo_eventID(&recv_start); 
    945         MPE_Log_get_solo_eventID(&recv_finish); 
    946         MPE_Log_get_solo_eventID(&sendunex_start); 
    947         MPE_Log_get_solo_eventID(&sendunex_finish); 
    948         MPE_Log_get_solo_eventID(&recvunex_start); 
    949         MPE_Log_get_solo_eventID(&recvunex_finish); 
    950         MPE_Describe_info_event(send_start, "Send_start", "red1", "tag:%d"); 
    951         MPE_Describe_info_event(send_finish, "Send_finish", "red3", "tag:%d"); 
    952         MPE_Describe_info_event(recv_start, "Recv_start", "blue1", "tag:%d"); 
    953         MPE_Describe_info_event(recv_finish, "Recv_finish", "blue3", "tag:%d"); 
    954         MPE_Describe_info_event(sendunex_start, "SendUnex_start", "orange1", "tag:%d"); 
    955         MPE_Describe_info_event(sendunex_finish, "SendUnex_finish", "orange3", "tag:%d"); 
    956         MPE_Describe_info_event(recvunex_start, "RecvUnex_start", "green1", "tag:%d"); 
    957         MPE_Describe_info_event(recvunex_finish, "RecvUnex_finish", "green3", "tag:%d"); 
    958 #endif /* state or event */ 
    959 #endif /* BMX_LOGGING */ 
    960966 
    961967         /* check params */ 
     
    10241030                        ret = bmx_ctx_alloc(&rx, BMX_REQ_RX); 
    10251031                        if (ret == 0) { 
    1026                                 bmx_put_idle_rx(rx); 
     1032                                bmx_put_idle_ctx(rx); 
    10271033                        } 
    10281034                } 
     
    11371143        debug(BMX_DB_MEM, "memory leaked at shutdown %lld", llu(mem_used)); 
    11381144#endif 
    1139  
    1140 #if BMX_LOGGING 
    1141         MPE_Finish_log("/tmp/bmi_mx.log"); 
    1142 #endif 
    11431145        BMX_EXIT; 
    1144  
    11451146        return 0; 
    11461147} 
     
    11801181                tx = qlist_entry(queued_txs->next, struct bmx_ctx, mxc_list); 
    11811182                qlist_del_init(&tx->mxc_list); 
    1182                 bmx_q_canceled_ctx(tx, err); 
     1183                bmx_q_completed(tx, BMX_CTX_CANCELED, BMX_NO_STATUS, err); 
    11831184        } 
    11841185        /* cancel queued rxs */ 
     
    11871188                rx = qlist_entry(queued_rxs->next, struct bmx_ctx, mxc_list); 
    11881189                qlist_del_init(&rx->mxc_list); 
    1189                 bmx_q_canceled_ctx(rx, err); 
     1190                bmx_q_completed(rx, BMX_CTX_CANCELED, BMX_NO_STATUS, err); 
    11901191        } 
    11911192        /* try to cancel pending rxs */ 
     
    11951196                if (result) { 
    11961197                        qlist_del_init(&rx->mxc_list); 
    1197                         bmx_q_canceled_ctx(rx, err); 
     1198                        bmx_q_completed(rx, BMX_CTX_CANCELED, BMX_NO_STATUS, err); 
    11981199                } 
    11991200        } 
     
    14241425        uint64_t        type    = (uint64_t) ctx->mxc_msg_type; 
    14251426        uint64_t        id      = 0ULL; 
    1426         uint64_t        tag     = (uint64_t) ctx->mxc_tag; 
     1427        uint64_t        tag     = (uint64_t) ((uint32_t) ctx->mxc_tag); 
    14271428 
    14281429        if (ctx->mxc_msg_type == BMX_MSG_CONN_REQ ||  
     
    15171518                        ret = -BMI_ENOMEM; 
    15181519                        bmx_deq_pending_ctx(tx);        /* uses peer lock */ 
    1519                         bmx_q_canceled_ctx(tx, BMI_ENOMEM); 
     1520                        bmx_q_completed(tx, BMX_CTX_CANCELED, BMX_NO_STATUS, BMI_ENOMEM); 
    15201521                } 
    15211522        } else { /* peer is not ready */ 
     
    15721573        PINT_event_id            eid    = 0; 
    15731574 
    1574 #if BMX_LOGGING 
    1575         if (!is_unexpected) { 
    1576                 MPE_Log_event(send_start, (int) tag, NULL); 
    1577         } else { 
    1578                 MPE_Log_event(sendunex_start, (int) tag, NULL); 
    1579         } 
    1580 #endif 
    1581  
    15821575        PINT_EVENT_START( 
    15831576            bmi_mx_send_event_id, bmi_mx_pid, NULL, &eid, 
     
    16211614                BMX_MALLOC(segs, (numbufs * sizeof(*segs))); 
    16221615                if (segs == NULL) { 
    1623                         bmx_put_idle_tx(tx); 
     1616                        bmx_put_idle_ctx(tx); 
    16241617                        bmx_peer_decref(peer); 
    16251618                        ret = -BMI_ENOMEM; 
     
    16421635 
    16431636        if (is_unexpected && tx->mxc_nob > (long long) BMX_UNEXPECTED_SIZE) { 
    1644                 bmx_put_idle_tx(tx); 
     1637                bmx_put_idle_ctx(tx); 
    16451638                bmx_peer_decref(peer); 
    16461639                ret = -BMI_EINVAL; 
     
    16581651        BMX_MALLOC(mop, sizeof(*mop)); 
    16591652        if (mop == NULL) { 
    1660                 bmx_put_idle_tx(tx); 
     1653                bmx_put_idle_ctx(tx); 
    16611654                bmx_peer_decref(peer); 
    16621655                ret = -BMI_ENOMEM; 
     
    16731666        tx->mxc_mop = mop; 
    16741667 
     1668        assert(context_id == mop->context_id); 
     1669        assert(context_id == tx->mxc_mop->context_id); 
     1670 
    16751671        bmx_create_match(tx); 
    16761672 
    1677         debug(BMX_DB_CTX, "%s tag= %d length= %d %s op_id= %llu", __func__, tag, 
    1678                         (int) total_size, is_unexpected ? "UNEXPECTED" : "EXPECTED", 
    1679                         llu(mop->op_id)); 
     1673        debug(BMX_DB_CTX, "%s tag= %d length= %d %s op_id= %llu context_id= %lld", 
     1674                        __func__, tag, (int) total_size, 
     1675                        is_unexpected ? "UNEXPECTED" : "EXPECTED", 
     1676                        llu(mop->op_id), lld(context_id)); 
    16801677 
    16811678        ret = bmx_post_tx(tx); 
     
    17921789                        ret = -BMI_ENOMEM; 
    17931790                        bmx_deq_pending_ctx(rx);        /* uses peer lock */ 
    1794                         bmx_q_canceled_ctx(rx, BMI_ENOMEM); 
     1791                        bmx_q_completed(rx, BMX_CTX_CANCELED, BMX_NO_STATUS, BMI_ENOMEM); 
    17951792                } 
    17961793        } else { /* peer is not ready */ 
     
    18171814        struct bmx_peer         *peer   = NULL; 
    18181815        PINT_event_id            eid    = 0; 
    1819  
    1820 #if BMX_LOGGING 
    1821         MPE_Log_event(recv_start, (int) tag, NULL); 
    1822 #endif 
    18231816 
    18241817        PINT_EVENT_START( 
     
    18651858                BMX_MALLOC(segs, (numbufs * sizeof(*segs))); 
    18661859                if (segs == NULL) { 
    1867                         bmx_put_idle_rx(rx); 
     1860                        bmx_put_idle_ctx(rx); 
    18681861                        bmx_peer_decref(peer); 
    18691862                        ret = -BMI_ENOMEM; 
     
    18871880        BMX_MALLOC(mop, sizeof(*mop)); 
    18881881        if (mop == NULL) { 
    1889                 bmx_put_idle_rx(rx); 
     1882                bmx_put_idle_ctx(rx); 
    18901883                bmx_peer_decref(peer); 
    18911884                ret = -BMI_ENOMEM; 
     
    20262019        } 
    20272020 
    2028 #if BMX_LOGGING 
    2029         MPE_Log_event(recvunex_start, (int) tag, NULL); 
    2030 #endif 
    2031  
    20322021        rx = bmx_get_idle_rx(); 
    20332022        if (rx != NULL) { 
     
    20622051                                        "unexpected recv with tag %d length %d", 
    20632052                                        mx_strerror(mxret), tag, length); 
    2064                         bmx_put_idle_rx(rx); 
     2053                        bmx_put_idle_ctx(rx); 
    20652054                        ret = -1; 
    20662055                } 
     
    21212110                                                "unexpected recv with %s",  
    21222111                                                mx_strerror(mxret)); 
    2123                                 bmx_put_idle_rx(rx); 
     2112                                bmx_put_idle_ctx(rx); 
    21242113                                ret = MX_RECV_FINISHED; 
    21252114                        } 
     
    23412330                                /* drop ref taken before mx_iconnect() */ 
    23422331                                bmx_peer_decref(tx->mxc_peer); 
    2343                                 bmx_put_idle_tx(tx); 
     2332                                bmx_put_idle_ctx(tx); 
    23442333                                continue; 
    23452334                        } else if (status.code != MX_STATUS_SUCCESS) { 
    23462335                                bmx_peer_decref(rx->mxc_peer); 
    2347                                 bmx_put_idle_rx(rx); 
     2336                                bmx_put_idle_ctx(rx); 
    23482337                                continue; 
    23492338                        } 
     
    23562345                                                BMX_VERSION, version); 
    23572346                                bmx_peer_decref(rx->mxc_peer); 
    2358                                 bmx_put_idle_rx(rx); 
     2347                                bmx_put_idle_ctx(rx); 
    23592348                                continue; 
    23602349                        } 
     
    23622351                                debug(BMX_DB_WARN, "received CONN_REQ on a client."); 
    23632352                                bmx_peer_decref(rx->mxc_peer); 
    2364                                 bmx_put_idle_rx(rx); 
     2353                                bmx_put_idle_ctx(rx); 
    23652354                                continue; 
    23662355                        } 
     
    23902379                                                        (char *) rx->mxc_buffer); 
    23912380                                        bmx_peer_decref(rx->mxc_peer); 
    2392                                         bmx_put_idle_rx(rx); 
     2381                                        bmx_put_idle_ctx(rx); 
    23932382                                        continue; 
    23942383                                } 
     
    23992388                                                        "method addr for %s", peername); 
    24002389                                        bmx_peer_decref(rx->mxc_peer); 
    2401                                         bmx_put_idle_rx(rx); 
     2390                                        bmx_put_idle_ctx(rx); 
    24022391                                        continue; 
    24032392                                } 
     
    24092398                                                        "peer for %s", peername); 
    24102399                                        bmx_peer_decref(rx->mxc_peer); 
    2411                                         bmx_put_idle_rx(rx); 
     2400                                        bmx_put_idle_ctx(rx); 
    24122401                                        continue; 
    24132402                                } 
     
    24362425                        mx_iconnect(bmi_mx->bmx_ep, peer->mxp_nic_id, mxmap->mxm_ep_id, 
    24372426                                    BMX_MAGIC, ack, peer, &request); 
    2438                         bmx_put_idle_rx(rx); 
     2427                        bmx_put_idle_ctx(rx); 
    24392428                } 
    24402429        } while (result); 
     
    25512540                                        llu(tx->mxc_match), mx_strstatus(status.code)); 
    25522541                        bmx_peer_decref(tx->mxc_peer); 
    2553                         bmx_put_idle_tx(tx); 
     2542                        bmx_put_idle_ctx(tx); 
    25542543                } 
    25552544        } while (result); 
     
    25782567} 
    25792568 
     2569static void 
     2570bmx_complete_ctx(struct bmx_ctx *ctx, bmi_op_id_t *outid, bmi_error_code_t *err, 
     2571                 bmi_size_t *size, void **user_ptr) 
     2572{ 
     2573        struct bmx_peer *peer   = ctx->mxc_peer; 
     2574 
     2575        *outid = ctx->mxc_mop->op_id; 
     2576        *err = ctx->mxc_error; 
     2577        *size = ctx->mxc_mxstat.xfer_length; 
     2578        if (user_ptr) 
     2579                *user_ptr = ctx->mxc_mop->user_ptr; 
     2580        PINT_EVENT_END( 
     2581            (ctx->mxc_type == BMX_REQ_TX ? 
     2582             bmi_mx_send_event_id : bmi_mx_recv_event_id), 
     2583            bmi_mx_pid, NULL, ctx->mxc_mop->event_id, 
     2584            *outid, *size); 
     2585 
     2586        id_gen_fast_unregister(ctx->mxc_mop->op_id); 
     2587        BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop)); 
     2588        bmx_put_idle_ctx(ctx); 
     2589        bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */ 
     2590 
     2591        return; 
     2592} 
     2593 
    25802594static int 
    25812595BMI_mx_test(bmi_op_id_t id, int *outcount, bmi_error_code_t *err, 
    25822596            bmi_size_t *size, void **user_ptr, int max_idle_time __unused, 
    2583             bmi_context_id context_id __unused) 
     2597            bmi_context_id context_id) 
    25842598{ 
    25852599        uint32_t         result = 0; 
     
    25922606        bmx_connection_handlers(); 
    25932607 
     2608        bmx_get_completion_token(); 
     2609 
    25942610        mop = id_gen_fast_lookup(id); 
    25952611        ctx = mop->method_data; 
    25962612        peer = ctx->mxc_peer; 
    25972613 
     2614        assert(context_id == mop->context_id); 
     2615        if (ctx->mxc_type == BMX_REQ_RX) 
     2616                assert(ctx->mxc_msg_type != BMX_MSG_UNEXPECTED); 
     2617        assert(context_id == ctx->mxc_mop->context_id); 
     2618 
    25982619        switch (ctx->mxc_state) { 
     2620        case BMX_CTX_COMPLETED: 
    25992621        case BMX_CTX_CANCELED: 
    2600                 /* we are racing with testcontext */ 
    2601                 gen_mutex_lock(&bmi_mx->bmx_canceled_lock); 
    2602                 if (ctx->mxc_state != BMX_CTX_CANCELED) { 
    2603                         gen_mutex_unlock(&bmi_mx->bmx_canceled_lock); 
    2604                         return 0; 
    2605                 } 
     2622                gen_mutex_lock(&bmi_mx->bmx_done_q_lock[(int) context_id]); 
    26062623                qlist_del_init(&ctx->mxc_list); 
    2607                 gen_mutex_unlock(&bmi_mx->bmx_canceled_lock); 
     2624                gen_mutex_unlock(&bmi_mx->bmx_done_q_lock[(int) context_id]); 
     2625                bmx_complete_ctx(ctx, &id, err, size, user_ptr); 
    26082626                *outcount = 1; 
    2609                 *err = ctx->mxc_mxstat.code; 
    2610                 if (ctx->mxc_mop) { 
    2611                         if (user_ptr) { 
    2612                                 *user_ptr = ctx->mxc_mop->user_ptr; 
    2613                         } 
    2614                         id_gen_fast_unregister(ctx->mxc_mop->op_id); 
    2615                         BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop)); 
    2616                 } 
    2617                 bmx_peer_decref(peer); 
    2618                 if (ctx->mxc_type == BMX_REQ_TX) { 
    2619                         bmx_put_idle_tx(ctx); 
    2620                 } else { 
    2621                         bmx_put_idle_rx(ctx); 
    2622                 } 
    26232627                break; 
    26242628        case BMX_CTX_PENDING: 
    2625                 /* racing with mx_test_any() in textcontext? */ 
    26262629                mx_test(bmi_mx->bmx_ep, &ctx->mxc_mxreq, &ctx->mxc_mxstat, &result); 
    26272630                if (result) { 
    2628                         PINT_EVENT_END( 
    2629                             (ctx->mxc_type == BMX_REQ_TX ? 
    2630                              bmi_mx_send_event_id : bmi_mx_recv_event_id), 
    2631                             bmi_mx_pid, NULL, ctx->mxc_mop->event_id, 
    2632                             ctx->mxc_mop->op_id, *size); 
    2633  
     2631                        bmx_deq_pending_ctx(ctx); 
     2632                        bmx_complete_ctx(ctx, &id, err, size, user_ptr); 
    26342633                        *outcount = 1; 
    2635                         if (ctx->mxc_mxstat.code == MX_STATUS_SUCCESS) { 
    2636                                 *err = 0; 
    2637                                 *size = ctx->mxc_mxstat.xfer_length; 
    2638                         } else { 
    2639                                 *err = bmx_mx_to_bmi_errno(ctx->mxc_mxstat.code); 
    2640                         } 
    2641                         if (ctx->mxc_mop) { 
    2642                                 if (user_ptr) { 
    2643                                         *user_ptr = ctx->mxc_mop->user_ptr; 
    2644                                 } 
    2645                                 id_gen_fast_unregister(ctx->mxc_mop->op_id); 
    2646                                 BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop)); 
    2647                         } 
    2648                         bmx_deq_pending_ctx(ctx); 
    2649                         if (ctx->mxc_type == BMX_REQ_TX) { 
    2650                                 bmx_put_idle_tx(ctx); 
    2651                         } else { 
    2652                                 bmx_put_idle_rx(ctx); 
    2653                         } 
    2654                         bmx_peer_decref(peer); 
    26552634                } 
    26562635                break; 
     
    26592638                        ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", ctx->mxc_state); 
    26602639        } 
     2640        bmx_release_completion_token(); 
    26612641        BMX_EXIT; 
    26622642 
     
    26772657        struct bmx_ctx  *ctx            = NULL; 
    26782658        struct bmx_peer *peer           = NULL; 
    2679         list_t          *canceled       = &bmi_mx->bmx_canceled; 
    26802659        int             wait            = 0; 
    26812660        static int      count           = 0; 
     
    26892668        bmx_connection_handlers(); 
    26902669 
    2691         /* always return canceled messages first */ 
    2692         while (completed < incount && !qlist_empty(canceled)) { 
    2693                 gen_mutex_lock(&bmi_mx->bmx_canceled_lock); 
    2694                 ctx = qlist_entry(canceled->next, struct bmx_ctx, mxc_list); 
    2695                 qlist_del_init(&ctx->mxc_list); 
    2696                 /* change state in case test is trying to reap it as well */ 
    2697                 ctx->mxc_state = BMX_CTX_COMPLETED; 
    2698                 gen_mutex_unlock(&bmi_mx->bmx_canceled_lock); 
    2699                 peer = ctx->mxc_peer; 
    2700                 outids[completed] = ctx->mxc_mop->op_id; 
    2701                 errs[completed] = ctx->mxc_mxstat.code; 
    2702                 if (user_ptrs) 
    2703                         user_ptrs[completed] = ctx->mxc_mop->user_ptr; 
    2704                 id_gen_fast_unregister(ctx->mxc_mop->op_id); 
    2705                 BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop)); 
    2706                 completed++; 
    2707                 if (ctx->mxc_type == BMX_REQ_TX) { 
    2708                         bmx_put_idle_tx(ctx); 
    2709                 } else { 
    2710                         bmx_put_idle_rx(ctx); 
    2711                 } 
    2712                 bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */ 
    2713                 if (completed > 0) { 
    2714                         debug(BMX_DB_CTX, "%s found %d canceled messages", 
    2715                               __func__, completed); 
    2716                 } 
    2717         } 
    2718  
    2719         /* return completed messages 
     2670        bmx_get_completion_token(); 
     2671 
     2672        /* always return queued, completed messages first */ 
     2673        do { 
     2674                bmx_deq_completed(&ctx, context_id); 
     2675                if (ctx) { 
     2676                        bmx_complete_ctx(ctx, &outids[completed], &errs[completed], 
     2677                                         &sizes[completed], &user_ptrs[completed]); 
     2678                        completed++; 
     2679                } 
     2680        } while (completed < incount && ctx != NULL); 
     2681 
     2682        if (completed > 0) 
     2683                debug(BMX_DB_CTX, "%s found %d completed messages", __func__, completed); 
     2684 
     2685        /* try to complete expected messages 
    27202686         * we will always try (incount - completed) times even 
    27212687         *     if some iterations have no result */ 
     
    27362702                        wait = 2; 
    27372703                } 
     2704 
    27382705                if (result) { 
    27392706                        ctx = (struct bmx_ctx *) status.context; 
    27402707                        bmx_deq_pending_ctx(ctx); 
     2708                        if (ctx->mxc_mop->context_id != context_id) { 
     2709                                bmx_q_completed(ctx, BMX_CTX_COMPLETED, status, 
     2710                                                bmx_mx_to_bmi_errno(status.code)); 
     2711                                continue; 
     2712                        } 
     2713                        ctx->mxc_mxstat = status; 
    27412714                        peer = ctx->mxc_peer; 
    27422715                        debug(BMX_DB_CTX, "%s completing expected %s with match 0x%llx " 
    2743                                         "for %s with op_id %llu length %d %s", __func__,  
    2744                                         ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", 
     2716                                        "for %s with op_id %llu length %d %s " 
     2717                                        "context_id= %d mop->context_id= %d", 
     2718                                        __func__, ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", 
    27452719                                        llu(ctx->mxc_match), peer->mxp_mxmap->mxm_peername, 
    27462720                                        llu(ctx->mxc_mop->op_id), status.xfer_length, 
    2747                                         mx_strstatus(status.code)); 
    2748  
    2749                         outids[completed] = ctx->mxc_mop->op_id; 
    2750                         if (status.code == MX_SUCCESS) { 
    2751                                 errs[completed] = 0; 
    2752                                 sizes[completed] = status.xfer_length; 
    2753                         } else { 
    2754                                 errs[completed] = bmx_mx_to_bmi_errno(status.code); 
    2755                         } 
    2756                         if (user_ptrs) 
    2757                                 user_ptrs[completed] = ctx->mxc_mop->user_ptr; 
    2758  
    2759                         PINT_EVENT_END( 
    2760                             (ctx->mxc_type == BMX_REQ_TX ? 
    2761                              bmi_mx_send_event_id : bmi_mx_recv_event_id), 
    2762                             bmi_mx_pid, NULL, ctx->mxc_mop->event_id, 
    2763                             ctx->mxc_mop->op_id, status.xfer_length); 
    2764  
    2765                         id_gen_fast_unregister(ctx->mxc_mop->op_id); 
    2766                         BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop)); 
     2721                                        mx_strstatus(status.code), (int) context_id, 
     2722                                        (int) ctx->mxc_mop->context_id); 
     2723 
     2724                        bmx_complete_ctx(ctx, &outids[completed], &errs[completed], 
     2725                                         &sizes[completed], &user_ptrs[completed]); 
    27672726                        completed++; 
    2768 #if BMX_LOGGING 
    2769                         if (ctx->mxc_type == BMX_REQ_TX) { 
    2770                                 MPE_Log_event(send_finish, (int) ctx->mxc_tag, NULL); 
    2771                         } else { 
    2772                                 MPE_Log_event(recv_finish, (int) ctx->mxc_tag, NULL); 
    2773                         } 
    2774 #endif 
    2775                         if (ctx->mxc_type == BMX_REQ_TX) { 
    2776                                 bmx_put_idle_tx(ctx); 
    2777                         } else { 
    2778                                 bmx_put_idle_rx(ctx); 
    2779                         } 
    2780                         bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */ 
    2781                 } 
    2782                 if (completed - old > 0) { 
    2783                         debug(BMX_DB_CTX, "%s found %d expected messages", 
    2784                               __func__, completed - old); 
    2785                 } 
    2786         } 
    2787  
    2788         /* check for completed unexpected sends */ 
     2727                } 
     2728        } 
     2729 
     2730        if (completed - old > 0) 
     2731                debug(BMX_DB_CTX, "%s found %d expected messages", __func__, completed - old); 
     2732 
     2733        /* try to complete unexpected sends */ 
    27892734 
    27902735        match = (uint64_t) BMX_MSG_UNEXPECTED << BMX_MSG_SHIFT; 
     
    27952740                uint32_t        result          = 0; 
    27962741                mx_status_t     status; 
    2797                 list_t          *unex_txs       = &bmi_mx->bmx_unex_txs; 
    27982742                int             again           = 1; 
    27992743 
    28002744                ctx = NULL; 
    2801  
    2802                 gen_mutex_lock(&bmi_mx->bmx_unex_txs_lock); 
    2803                 if (!qlist_empty(unex_txs)) { 
    2804                         ctx = qlist_entry(unex_txs->next, struct bmx_ctx, mxc_list); 
    2805                         peer = ctx->mxc_peer; 
    2806                         qlist_del_init(&ctx->mxc_list); 
    2807                         result = 1; 
    2808                 } 
    2809                 gen_mutex_unlock(&bmi_mx->bmx_unex_txs_lock); 
    28102745 
    28112746                while (!ctx && again) { 
     
    28162751                                bmx_deq_pending_ctx(ctx); 
    28172752                                peer = ctx->mxc_peer; 
    2818                                 if (ctx->mxc_type == BMX_REQ_RX) { 
    2819                                         /* queue until testunexpected is called */ 
    2820                                         bmx_q_unex_ctx(ctx); 
     2753                                if (ctx->mxc_type == BMX_REQ_RX || 
     2754                                    ctx->mxc_mop->context_id != context_id) { 
     2755                                        /* queue until testunexpected or queue 
     2756                                         * until testcontext for the correct context */ 
     2757                                        bmx_q_completed(ctx, BMX_CTX_COMPLETED, status, 
     2758                                                        bmx_mx_to_bmi_errno(status.code)); 
    28212759                                        result = 0; 
    28222760                                        again = 1; 
     
    28342772                                        llu(ctx->mxc_mop->op_id)); 
    28352773 
    2836                         outids[completed] = ctx->mxc_mop->op_id; 
    2837                         if (status.code == MX_SUCCESS) { 
    2838                                 errs[completed] = 0; 
    2839                                 sizes[completed] = status.xfer_length; 
    2840                         } else { 
    2841                                 errs[completed] = bmx_mx_to_bmi_errno(status.code); 
     2774                        ctx->mxc_mxstat = status; 
     2775                        bmx_complete_ctx(ctx, &outids[completed], &errs[completed], 
     2776                                         &sizes[completed], &user_ptrs[completed]); 
     2777 
     2778                        if (status.code != MX_SUCCESS) { 
    28422779                                debug(BMX_DB_CTX, "%s unexpected send completed with " 
    28432780                                      "error %s", __func__, mx_strstatus(status.code)); 
    28442781                                bmx_peer_disconnect(peer, 0, BMI_ENETRESET); 
    28452782                        } 
    2846                         if (user_ptrs) 
    2847                                 user_ptrs[completed] = ctx->mxc_mop->user_ptr; 
    2848                         PINT_EVENT_END( 
    2849                             (ctx->mxc_type == BMX_REQ_TX ? 
    2850                              bmi_mx_send_event_id : bmi_mx_recv_event_id), 
    2851                             bmi_mx_pid, NULL, ctx->mxc_mop->event_id, 
    2852                             ctx->mxc_mop->op_id, status.xfer_length); 
    2853  
    2854                         id_gen_fast_unregister(ctx->mxc_mop->op_id); 
    2855                         BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop)); 
    28562783                        completed++; 
    2857 #if BMX_LOGGING 
    2858                         MPE_Log_event(sendunex_finish, (int) ctx->mxc_tag, NULL); 
    2859 #endif 
    2860  
    2861                         bmx_put_idle_tx(ctx); 
    2862                         bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */ 
    2863                 } 
    2864         } 
     2784                } 
     2785        } 
     2786        bmx_release_completion_token(); 
     2787 
    28652788        if (completed - old > 0) { 
    28662789                debug(BMX_DB_CTX, "%s found %d unexpected tx messages",  
     
    28882811        struct bmx_ctx  *rx             = NULL; 
    28892812        struct bmx_peer *peer           = NULL; 
    2890         list_t          *unex_rxs       = &bmi_mx->bmx_unex_rxs; 
    28912813        int             again           = 1; 
    28922814 
     
    28972819 
    28982820        bmx_connection_handlers(); 
     2821 
     2822        bmx_get_completion_token(); 
    28992823 
    29002824        /* if the unexpected handler cannot get a rx, it does not post a receive. 
     
    29032827        if (result) { 
    29042828                int     ret     = 0; 
    2905                 ret = bmx_post_unexpected_recv(status.source, 0, 0, 0, status.match_info, status.xfer_length); 
     2829                ret = bmx_post_unexpected_recv(status.source, 0, 0, 0, 
     2830                                               status.match_info, 
     2831                                               status.xfer_length); 
    29062832                if (ret != 0) { 
    29072833                        debug(BMX_DB_CTX, "%s mx_iprobe() found rx with match 0x%llx " 
     
    29142840        *outcount = 0; 
    29152841 
    2916         gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock); 
    2917         if (!qlist_empty(unex_rxs)) { 
    2918                 rx = qlist_entry(unex_rxs->next, struct bmx_ctx, mxc_list); 
     2842        bmx_deq_unex_rx(&rx); 
     2843        if (rx) { 
     2844                result = 1; 
     2845                status = rx->mxc_mxstat; 
    29192846                peer = rx->mxc_peer; 
    2920                 qlist_del_init(&rx->mxc_list); 
    2921                 result = 1; 
    2922         } 
    2923         gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock); 
     2847        } 
    29242848 
    29252849        while (!rx && again) { 
     
    29312855                        peer = rx->mxc_peer; 
    29322856                        if (rx->mxc_type == BMX_REQ_TX) { 
    2933                                 bmx_q_unex_ctx(rx); 
     2857                                bmx_q_completed(rx, BMX_CTX_COMPLETED, status, 
     2858                                                bmx_mx_to_bmi_errno(status.code)); 
    29342859                                result = 0; 
    29352860                                again = 1; 
     
    29532878                ui->tag = rx->mxc_tag; 
    29542879 
    2955 #if BMX_LOGGING 
    2956                 MPE_Log_event(recvunex_finish, (int) rx->mxc_tag, NULL); 
    2957 #endif 
    2958                 bmx_put_idle_rx(rx); 
     2880                bmx_put_idle_ctx(rx); 
    29592881                bmx_peer_decref(peer); /* drop the ref taken in unexpected_recv() */ 
    29602882                *outcount = 1; 
    29612883        } 
     2884        bmx_release_completion_token(); 
     2885 
    29622886        if (print) 
    29632887                BMX_EXIT; 
     
    30142938 
    30152939                ret = bmx_open_endpoint(&bmi_mx->bmx_ep, 
    3016                                         bmi_mx->bmx_board, 
     2940                                        MX_ANY_NIC, 
    30172941                                        MX_ANY_ENDPOINT); 
    30182942                if (ret != 0) { 
     
    30262950                mx_decompose_endpoint_addr2(epa, &nic_id, &bmi_mx->bmx_ep_id, 
    30272951                                            &bmi_mx->bmx_sid); 
     2952                /* get our board number */ 
     2953                mx_nic_id_to_board_number(nic_id, &bmi_mx->bmx_board); 
    30282954                /* get our hostname */ 
    30292955                mx_nic_id_to_hostname(nic_id, host); 
     
    31413067/* NOTE There may be a race between this and BMI_mx_testcontext(). */ 
    31423068static int 
    3143 BMI_mx_cancel(bmi_op_id_t id, bmi_context_id context_id __unused) 
     3069BMI_mx_cancel(bmi_op_id_t id, bmi_context_id context_id) 
    31443070{ 
    31453071        struct method_op        *mop; 
     
    31503076        BMX_ENTER; 
    31513077 
     3078        bmx_get_completion_token(); 
     3079 
    31523080        mop = id_gen_fast_lookup(id); 
    31533081        ctx = mop->method_data; 
    31543082        peer = ctx->mxc_peer; 
    31553083 
     3084        assert(context_id == ctx->mxc_mop->context_id); 
     3085 
    31563086        debug(BMX_DB_CTX, "%s %s op_id %llu mxc_state %d peer state %d", __func__,  
    31573087                        ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX",  
    31583088                        llu(ctx->mxc_mop->op_id), ctx->mxc_state, peer->mxp_state); 
     3089 
     3090        /* avoid race with connection setup */ 
     3091        gen_mutex_lock(&peer->mxp_lock); 
     3092 
    31593093        switch (ctx->mxc_state) { 
    31603094        case BMX_CTX_QUEUED: 
    3161                 /* we are racing with the connection setup */ 
    3162                 bmx_deq_ctx(ctx); 
    3163                 bmx_q_canceled_ctx(ctx, BMI_ECANCEL); 
     3095                qlist_del_init(&ctx->mxc_list); 
     3096                gen_mutex_unlock(&peer->mxp_lock); 
     3097                bmx_q_completed(ctx, BMX_CTX_CANCELED, BMX_NO_STATUS, BMI_ECANCEL); 
    31643098                break; 
    31653099        case BMX_CTX_PENDING: 
     3100                gen_mutex_unlock(&peer->mxp_lock); 
    31663101                if (ctx->mxc_type == BMX_REQ_TX) { 
    31673102                        /* see if it completed first */ 
     
    31703105                                debug(BMX_DB_CTX, "%s completed TX op_id %llu " 
    31713106                                      "mxc_state %d peer state %d status.code %s", 
    3172                                       __func__, llu(ctx->mxc_mop->op_id), ctx->mxc_state,  
     3107                                      __func__, llu(ctx->mxc_mop->op_id), ctx->mxc_state, 
    31733108                                      peer->mxp_state, mx_strstatus(ctx->mxc_mxstat.code)); 
    31743109                                bmx_deq_pending_ctx(ctx); 
    3175                                 bmx_q_canceled_ctx(ctx,  BMI_ECANCEL); 
     3110                                bmx_q_completed(ctx, BMX_CTX_CANCELED, 
     3111                                                ctx->mxc_mxstat, BMI_ECANCEL); 
    31763112                        } else { 
    31773113                                /* and if not, then disconnect() */ 
     
    31823118                        if (result == 1) { 
    31833119                                bmx_deq_pending_ctx(ctx); 
    3184                                 bmx_q_canceled_ctx(ctx,  BMI_ECANCEL); 
     3120                                bmx_q_completed(ctx, BMX_CTX_CANCELED, 
     3121                                                BMX_NO_STATUS, BMI_ECANCEL); 
    31853122                        } 
    31863123                } 
     
    31903127                        ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", ctx->mxc_state); 
    31913128        } 
     3129        bmx_release_completion_token(); 
     3130 
    31923131        BMX_EXIT; 
    31933132 
     
    32133152{ 
    32143153    .method_name               = "bmi_mx", 
     3154    .flags = 0, 
    32153155    .initialize                = BMI_mx_initialize, 
    32163156    .finalize                  = BMI_mx_finalize, 
  • branches/cu-security-branch/src/io/bmi/bmi_mx/mx.h

    r7024 r8397  
    1616#include <unistd.h> 
    1717#include <errno.h> 
     18#include <assert.h> 
    1819 
    1920#include <mx_extensions.h>      /* needed for callback handler, etc. */ 
     
    5455#define BMX_DEBUG     1         /* enable debug (gossip) statements */ 
    5556#define BMX_MEM_ACCT  0         /* track number of bytes alloc's and freed */ 
    56 #define BMX_LOGGING   0         /* use MPE logging routines */ 
    57  
    58 #if BMX_LOGGING 
    59 #include "mpe.h" 
    60 #endif 
    6157 
    6258#if BMX_MEM_TWEAK 
     
    166162        gen_mutex_t         bmx_idle_rxs_lock;  /* idle_rxs lock */ 
    167163 
    168         list_t              bmx_canceled;       /* canceled reqs waiting for test */ 
    169         gen_mutex_t         bmx_canceled_lock;  /* canceled list lock */ 
    170  
    171         list_t              bmx_unex_txs;       /* completed unexpected sends */ 
    172         gen_mutex_t         bmx_unex_txs_lock;  /* completed unexpected sends lock */ 
     164        gen_mutex_t         bmx_completion_lock; /* lock for test* functions */ 
     165        int                 bmx_refcount;       /* try to avoid races between test* 
     166                                                   and cancel functions */ 
     167 
     168                                                /* completed expected msgs 
     169                                                 * including unexpected sends */ 
     170        list_t              bmx_done_q[BMI_MAX_CONTEXTS]; 
     171        gen_mutex_t         bmx_done_q_lock[BMI_MAX_CONTEXTS]; 
     172 
    173173        list_t              bmx_unex_rxs;       /* completed unexpected recvs */ 
    174174        gen_mutex_t         bmx_unex_rxs_lock;  /* completed unexpected recvs lock */ 
     
    278278        mx_request_t        mxc_mxreq;      /* MX request */ 
    279279        mx_status_t         mxc_mxstat;     /* MX status */ 
     280        bmi_error_code_t    mxc_error;      /* BMI error code */ 
    280281 
    281282        uint64_t            mxc_get;        /* # of times returned from idle list */ 
  • branches/cu-security-branch/src/io/bmi/bmi_portals

    • Property svn:ignore deleted
  • branches/cu-security-branch/src/io/bmi/bmi_portals/portals.c

    r7941 r8397  
    22942294{ 
    22952295    .method_name = "bmi_portals", 
     2296    .flags = 0, 
    22962297    .initialize = bmip_initialize, 
    22972298    .finalize = bmip_finalize, 
  • branches/cu-security-branch/src/io/bmi/bmi_tcp

    • Property svn:ignore deleted
  • branches/cu-security-branch/src/io/bmi/bmi_tcp/bmi-tcp.c

    r8330 r8397  
    36493649            (errno == ENONET) || 
    36503650            (errno == EHOSTUNREACH) || 
    3651             (errno == EOPNOTSUPP) || (errno == ENETUNREACH)) 
     3651            (errno == EOPNOTSUPP) || 
     3652            (errno == ENETUNREACH) || 
     3653            (errno == ENFILE) || 
     3654            (errno == EMFILE)) 
    36523655        { 
    36533656            /* try again later */ 
     3657            if ((errno == ENFILE) || (errno == EMFILE)) 
     3658            { 
     3659                gossip_err("Error: accept: %s (continuing)\n",strerror(errno)); 
     3660                bmi_method_addr_drop_callback(BMI_tcp_method_name); 
     3661            } 
    36543662            return (0); 
    36553663        } 
  • branches/cu-security-branch/src/io/bmi/bmi_tcp/socket-collection-epoll.c

    r8330 r8397  
    128128    memset(maps, 0, (sizeof(bmi_method_addr_p) * incount)); 
    129129    memset(status, 0, (sizeof(int) * incount)); 
     130 
     131    if(incount == 0) 
     132    { 
     133        return(0); 
     134    } 
    130135 
    131136    /* actually do the epoll_wait() here */ 
  • branches/cu-security-branch/src/io/bmi/bmi_tcp/socket-collection.c

    r7941 r8397  
    9090    if(new_server_socket > -1) 
    9191    { 
    92         tmp_scp->pollfd_array[0].fd = new_server_socket; 
    93         tmp_scp->pollfd_array[0].events = POLLIN; 
    94         tmp_scp->addr_array[0] = NULL; 
     92        tmp_scp->pollfd_array[tmp_scp->array_count].fd = new_server_socket; 
     93        tmp_scp->pollfd_array[tmp_scp->array_count].events = POLLIN; 
     94        tmp_scp->addr_array[tmp_scp->array_count] = NULL; 
    9595        tmp_scp->array_count++; 
    96         /* Add the pipe_fd[0] fd to the poll in set always */ 
    97         tmp_scp->pollfd_array[1].fd = tmp_scp->pipe_fd[0]; 
    98         tmp_scp->pollfd_array[1].events = POLLIN; 
    99         tmp_scp->addr_array[1] = NULL; 
    100         tmp_scp->array_count++; 
    101     } 
     96    } 
     97 
     98    /* Add the pipe_fd[0] fd to the poll in set always */ 
     99    tmp_scp->pollfd_array[tmp_scp->array_count].fd = tmp_scp->pipe_fd[0]; 
     100    tmp_scp->pollfd_array[tmp_scp->array_count].events = POLLIN; 
     101    tmp_scp->addr_array[tmp_scp->array_count] = NULL; 
     102    tmp_scp->array_count++; 
    102103 
    103104    return (tmp_scp); 
  • branches/cu-security-branch/src/io/bmi/bmi_zoid/README

    r8313 r8397  
    1717  needed, but is probably already larger than necessary, 
    1818 
    19 - ZBMI_SHM_SIZE_TOTAL (in init.c in ZeptoOS 
    20   packages/zoid/src/zbmi/implementation/ directory): total size of the shared 
    21   memory buffer used to exchange bulk data between the ZOID daemon and the BMI 
    22   server; defaults to 512M. 
    23  
    24 - ZBMI_SHM_SIZE_UNEXP (in init.c in ZeptoOS 
    25   packages/zoid/src/zbmi/implementation/ directory): part of the shared 
    26   memory buffer used for unexpected messages; defaults to 1M. 
     19- FIXME ZBMI limits 
    2720 
    2821 
     
    6457  invoked on the client side, as each such call will result in a 
    6558  communication with the I/O node (10ms is *way* too short, 1000ms is pretty 
    66   short too), 
    67  
    68 - to reduce the number of round-trip messages from the compute nodes, 
    69   client-side message post routines can optionally wait (with a timeout) 
    70   and thus increase a chance of an immediate completion.  Use 
    71   BMI_set_info(BMI_ZOID_POST_TIMEOUT, <timeout_ms>) to enable this feature. 
     59  short too). 
    7260 
    7361 
     
    11098 
    11199The zbmi plugin is mostly stateless so far as the compute node clients are 
    112 concerned.  Specifically, the information on posted expected message 
    113 sends/receives that were not immediately completed is stored exclusively on 
    114 the client side. 
     100concerned.  Specifically, the information on posted, but not immediately 
     101completed expected message sends/receives is stored exclusively on the 
     102client side. 
    115103 
    116104All BMI send routines end up in zoid_post_send_common.  That includes 
    117105unexpected messages and list I/O.  This routine attempts to forward the 
    118106message to the zbmi plugin on the I/O node, using zbmi_send.  For 
    119 unexpected messages, zbmi_send should normally succeed and result in an 
    120 immediate completion; however, if the zbmi plugin is out of memory, 
     107unexpected messages, zbmi_send is normally expected to succeed and result 
     108in an immediate completion; however, if the zbmi plugin is out of memory, 
    121109zbmi_send will fail with ENOMEM.  The same failure will occur with expected 
    122110messages if a matching receive has not been posted on the I/O node side by 
     
    124112request is put in the "zoid_ops" queue for another attempt later.  For 
    125113expected messages, if a matching receive has been posted, the call succeeds 
    126 resulting in an immediate completion.  zbmi_send normally does not block, 
    127 but if BMI_ZOID_POST_TIMEOUT has been enabled, it can, waiting for a 
    128 matching expected message post from the BMI server side or for memory to be 
    129 released on the BMI server side so that an unexpected message can be 
    130 stored. 
     114resulting in an immediate completion. 
    131115 
    132116The way zbmi_send is forwarded by ZOID, the data payload is only 
     
    140124is sent to the compute node and zbmi_recv returns 1, resulting in an 
    141125immediate completion.  Otherwise, the receive request is put in the 
    142 "zoid_ops" queue for another attempt later.  zbmi_recv normally does not 
    143 block, but if BMI_ZOID_POST_TIMEOUT has been enabled, it can, waiting for a 
    144 matching expected message post from the BMI server side. 
     126"zoid_ops" queue for another attempt later. 
    145127 
    146128BMI_cancel is very easy to implement thanks to a lack of multi-threading 
     
    156138sent to the server anymore.  For non-canceled requests, it extracts the 
    157139message tag, size, and send/recv indicator and forwards those to the server 
    158 using zbmi_test.  zbmi_test can block on the server for the specified time 
    159 if none of the specified requests is initially ready.  zbmi_test returns 
    160 the number of ready requests; if it is non-zero, then the server side must 
    161 have posted matching sends/receives, so zoid_test_common next attempts to 
    162 satisfy those "ready" requests by invoking zbmi_send/zbmi_recv.  Those 
    163 send/recv routines could still fail in spite of a successful test if there 
    164 is no memory or if the server side canceled its matching request; this is 
    165 recoverable. 
     140using zbmi_test.  zbmi_test is the only blocking call of the three; it can 
     141block on the server for the specified time if none of the specified 
     142requests is initially ready.  zbmi_test returns the number of ready 
     143requests; if it is non-zero, then zoid_test_common next attempts to satisfy 
     144those requests by invoking zbmi_send/zbmi_recv.  Those send/recv routines 
     145could still fail in spite of a successful test, if there is no memory, or 
     146if the server-side canceled its matching request; this is recoverable. 
    166147 
    167148SERVER 
     
    169150The communication between the I/O node BMI server and the zbmi plugin of 
    170151the ZOID daemon is carried across two channels.  Commands are sent via a 
    171 POSIX message queue (zbmi plugin is the server; multiple threads of the BMI 
    172 server can communicate simultaneously by opening multiple reply queues to 
    173 the server).  Payload is exchanged using a large shared memory segment, 
     152unix domain socket (zbmi plugin is the server; multiple threads of the BMI 
     153server can communicate simultaneously by opening multiple connections to 
     154the socket).  Payload is exchanged using a large shared memory segment, 
    174155allocated by the zbmi plugin.  We make efforts to avoid unnecessary copies 
    175156to/from that segment, so BMI_memalloc() on the BMI server side allocates 
     
    178159 
    179160The shared memory segment is split in two: a normally smaller region is 
    180 used for unexpected messages and is managed by the zbmi plugin, while a 
     161used for unexpected messages and is managed by the zbmi plugin, wile a 
    181162larger region is used for expected messages and is managed by the BMI 
    182163server. 
    183164 
    184165The communication with the zbmi plugin is established during 
    185 BMI_initialize, and terminated during BMI_finalize.  The message protocol is 
     166BMI_initialize, and terminated during BMI_finalize.  The stream protocol is 
    186167documented in zbmi's zbmi_protocol.h. 
    187168 
    188169For BMI testunexpected, we communicate the metadata on the pending received 
    189 messages via the queue, and the payload is in the shared memory buffer 
     170messages via the socket, and the payload is in the shared memory buffer 
    190171which is returned to the user.  unexpected_free just sends the buffer 
    191172address back to the zbmi plugin, since the unexpected messages memory pool 
     
    200181retry the allocation after every BMI_memfree. 
    201182 
    202 Expected server-side posts, be it sends or receives, can be completed 
    203 immediately if a matching client-side post (or test) is waiting when 
    204 server-side post is issued.  Note that "immediately" is used liberally 
    205 here; the server-side post will not return until the buffer has been 
    206 transferred to/from the compute node, which can take some time when the 
    207 ZOID server is under heavy load.  For posts not completed immediately we 
    208 send a message descriptor to the zbmi plugin which registers the message 
    209 and just sends back a confirmation.  When registering we exchange the 
    210 internal BMI id and the internal ZOID id, since that simplifies subsequent 
    211 testing/canceling. 
     183Expected server-side posts, be it sends or receives, are never completed 
     184immediately: we send a message descriptor to the zbmi plugin which 
     185registers it and just sends back a confirmation.  When registering we 
     186exchange the internal BMI id and the internal ZOID id, since that 
     187simplifies subsequent testing/canceling. 
    212188 
    213189Canceling messages is more complex than on the client side.  Generally, we 
     
    217193be ignored.  An exception is when the request has not been registered 
    218194because of the lack of memory for a temporary buffer as described earlier; 
    219 in that case we cancel it locally and put it in "error_ops" queue. 
     195in that case we cancel it locally and put in in "error_ops" queue. 
    220196 
    221197When testing (in zoid_server_test_common), we need to deal with locally 
     
    225201since they involve no communication with the zbmi plugin.  Unlike on the 
    226202client side, where the common test routines sort-of "emulated" testcontext 
    227 by first building an array of all pending request ids, on the server side 
     203but first building an array of all pending request ids, on the server side 
    228204we have a "native" implementation.  Testcontext is recognized by passing an 
    229205"incount" of 0, and we forward it to the zbmi plugin so that it knows to 
    230206return *any* completed request(s).  This is necessary because of 
    231207multi-threading constraints, and it is possible because the zbmi plugin 
    232 does maintain state for server-side requests.  The test is the only 
    233 server-side command that can block in the zbmi plugin for the specified 
     208does maintain state for server-side requests.  As with the client, the test 
     209is the only command that can block in the zbmi plugin for the specified 
    234210time period if no request is initially completed.  Completed requests 
    235211require no further handling, with the exceptions of those that used 
  • branches/cu-security-branch/src/io/bmi/bmi_zoid/server.c

    r8359 r8397  
    33#include <assert.h> 
    44#include <stdio.h> 
    5 #include <string.h> 
    65 
    76#include <fcntl.h> 
    8 #include <mqueue.h> 
     7#include <pthread.h> 
    98#include <sys/mman.h> 
     9#include <sys/socket.h> 
     10#include <sys/un.h> 
    1011#include <unistd.h> 
    1112 
    1213#include <bmi-method-support.h> 
    1314#include <bmi-method-callback.h> 
    14 #include <gen-locks.h> 
    1515#include <id-generator.h> 
    1616#include <op-list.h> 
     
    2121 
    2222/* method_op_p's method_data points to this structure on the server side.  */ 
    23 struct zoid_server_method_data 
     23struct ZoidServerMethodData 
    2424{ 
    2525    void* tmp_buffer; /* Used with BMI_EXT_ALLOC to store the address of the 
    2626                         temporary shared memory buffer, NULL otherwise.  */ 
    2727    int zoid_buf_id; /* 0 if the operation has not yet been sent to ZOID.  */ 
    28     gen_mutex_t post_mutex; /* Used to resolve the race between a post 
    29                                call and testcontext.  */ 
    3028}; 
    31  
    32 #define METHOD_DATA(op) ((struct zoid_server_method_data*)(op)->method_data) 
    3329 
    3430/* Describes a request with BMI_EXT_ALLOC pending for a temporary memory 
    3531   buffer.  */ 
    36 struct no_mem_descriptor 
    37 { 
    38     struct no_mem_descriptor* next; 
    39     struct no_mem_descriptor* prev; 
     32struct NoMemDescriptor 
     33{ 
     34    struct NoMemDescriptor* next; 
     35    struct NoMemDescriptor* prev; 
    4036 
    4137    bmi_size_t total_size; 
     
    4339}; 
    4440 
    45 /* Control queue to the zbmi plugin  in the ZOID daemon.  */ 
    46 static mqd_t zbmi_control_queue = -1; 
    47  
    48 /* Reply queues from the zbmi plugin.  */ 
    49 #define ZBMI_QUEUES_LEN_INIT 10 
    50 static gen_mutex_t zbmi_queues_mutex = GEN_MUTEX_INITIALIZER; 
    51 static mqd_t* zbmi_queues = NULL; 
    52 static char* zbmi_queues_inuse = NULL; /* Whether a particular zbmi_queues 
    53                                           entry is currently in use.  */ 
    54 static int zbmi_queues_len = 0; /* Length of zbmi_queues and 
    55                                    zbmi_queues_inuse.  */ 
    56 static int zbmi_queues_used = 0; /* Count of initialized zbmi_queues 
    57                                     entries.  */ 
     41/* Command streams to the zbmi plugin in the ZOID daemon.  */ 
     42#define ZBMI_SOCKETS_LEN_INIT 10 
     43static pthread_mutex_t zbmi_sockets_mutex = PTHREAD_MUTEX_INITIALIZER; 
     44static int* zbmi_sockets = NULL; 
     45static char* zbmi_sockets_inuse = NULL; /* Whether a particular zbmi_sockets 
     46                                           entry is currently in use.  */ 
     47static int zbmi_sockets_len = 0; /* Length of zbmi_sockets and 
     48                                    zbmi_sockets_inuse.  */ 
     49static int zbmi_sockets_used = 0; /* Count of initialized zbmi_sockets 
     50                                     entries.  */ 
    5851 
    5952/* An array of client addresses.  */ 
    6053#define CLIENTS_LEN_INC 10 
    61 static gen_mutex_t clients_mutex = GEN_MUTEX_INITIALIZER; 
     54static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; 
    6255static bmi_method_addr_p* clients_addr = NULL; 
    6356static int clients_len = 0; 
    6457 
    6558/* Shared memory buffer between the ZBMI plugin an us.  */ 
    66 static char* zbmi_shm = NULL; 
    67 static char *zbmi_shm_eager, *zbmi_shm_rzv; 
     59static void* zbmi_shm = NULL; 
     60static void *zbmi_shm_unexp, *zbmi_shm_exp; 
    6861 
    6962static int zbmi_shm_size_total; 
    70 static int zbmi_shm_size_eager; 
    71 static struct ZBMIEagerDesc* eager_desc; 
     63static int zbmi_shm_size_unexp; 
    7264 
    7365/* Queue of operations with BMI_EXT_ALLOC buffers pending for a temporary 
    7466   memory buffer, sorted by descending total_size.  */ 
    75 static struct no_mem_descriptor *no_mem_queue_first = NULL; 
     67static struct NoMemDescriptor *no_mem_queue_first = NULL; 
    7668/* Only valid if no_mem_queue_first != NULL.  */ 
    77 static struct no_mem_descriptor *no_mem_queue_last; 
     69static struct NoMemDescriptor *no_mem_queue_last; 
    7870/* Protects access to the above queue.  */ 
    79 static gen_mutex_t no_mem_queue_mutex = GEN_MUTEX_INITIALIZER; 
     71static pthread_mutex_t no_mem_queue_mutex = PTHREAD_MUTEX_INITIALIZER; 
    8072 
    8173/* Queue of failed/canceled operations (that the ZOID server doesn't know 
    8274   about).  */ 
    8375static op_list_p error_ops; 
    84 static gen_mutex_t error_ops_mutex = GEN_MUTEX_INITIALIZER; 
    85  
    86  
    87 static mqd_t get_zoid_reply_queue(int* queue_id); 
    88 static void release_zoid_reply_queue(int queue_id); 
     76static pthread_mutex_t error_ops_mutex = PTHREAD_MUTEX_INITIALIZER; 
     77 
     78 
     79static ssize_t socket_read(int fd, void* buf, size_t count); 
     80static ssize_t socket_write(int fd, const void* buf, size_t count); 
     81static int get_zoid_socket(int* release_token); 
     82static void release_zoid_socket(int release_token); 
    8983static bmi_method_addr_p get_client_addr(int zoid_addr); 
    9084static int enqueue_no_mem(method_op_p op, bmi_size_t total_size); 
    91 static int send_post_cmd(method_op_p op, int not_immediate, int* length); 
     85static int send_post_cmd(method_op_p op); 
    9286 
    9387 
     
    10397BMI_zoid_server_initialize(void) 
    10498{ 
    105     struct ZBMIControlInitCmd cmd; 
    106     struct ZBMIControlInitResp resp; 
     99    int hdr; 
     100    struct ZBMIControlInitResp init_resp; 
    107101    int shm_fd; 
    108     mqd_t reply_queue; 
    109     int queue_id; 
    110  
    111     /* Connect to the ZBMI plugin in the ZOID daemon.  */ 
    112     while ((zbmi_control_queue = mq_open(ZBMI_CONTROL_QUEUE_NAME, O_WRONLY)) 
    113            < 0) 
    114     { 
    115         if (errno == ENOENT) 
    116         { 
    117             /* Probably the ZOID server is not running yet...  */ 
    118             sleep(1); 
    119         } 
    120         else 
    121         { 
    122             perror("connect to ZOID"); 
    123             return -BMI_EINVAL; 
    124         } 
    125     } 
    126  
    127     /* This will initialize all the queue structures first.  */ 
    128     if ((reply_queue = get_zoid_reply_queue(&queue_id)) < 0) 
    129         return reply_queue; 
     102    int zoid_fd, zoid_release; 
     103 
     104    /* Connect to the ZBMI plugin in the ZOID daemon.  This will initialize 
     105       all the socket structures first.  */ 
     106     
     107    if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 
     108        return zoid_fd; 
    130109 
    131110    /* Initial handshake.  */ 
    132111 
    133     cmd.command_id = ZBMI_CONTROL_INIT; 
    134     cmd.queue_id = queue_id; 
    135  
    136     if (mq_send(zbmi_control_queue, (void*)&cmd, sizeof(cmd), 0) < 0) 
    137     { 
    138         perror("mq_send"); 
    139         release_zoid_reply_queue(queue_id); 
     112    hdr = ZBMI_CONTROL_INIT; 
     113 
     114    if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr)) 
     115    { 
     116        perror("write"); 
     117        release_zoid_socket(zoid_release); 
    140118        return -BMI_EINVAL; 
    141119    } 
    142120 
    143     if (mq_receive(reply_queue, (void*)&resp, ZBMI_MAX_MSG_SIZE, NULL) 
    144         != sizeof(resp)) 
    145     { 
    146         perror("mq_receive"); 
    147         release_zoid_reply_queue(queue_id); 
     121    if (socket_read(zoid_fd, &init_resp, sizeof(init_resp)) != 
     122        sizeof(init_resp)) 
     123    { 
     124        perror("read"); 
     125        release_zoid_socket(zoid_release); 
    148126        return -BMI_EINVAL; 
    149127    } 
    150128 
    151     release_zoid_reply_queue(queue_id); 
    152  
    153     zbmi_shm_size_total = resp.shm_size_total; 
    154     zbmi_shm_size_eager = resp.shm_size_eager; 
     129    release_zoid_socket(zoid_release); 
     130 
     131    zbmi_shm_size_total = init_resp.shm_size_total; 
     132    zbmi_shm_size_unexp = init_resp.shm_size_unexp; 
    155133 
    156134    /* Open the shared memory area.  */ 
     
    172150    close(shm_fd); 
    173151 
    174     /* The shared memory buffer starts with an eager section, which is 
     152    /* The shared memory buffer starts with an unexpected section, which is 
    175153       managed by the ZBMI ZOID plugin.  */ 
    176     zbmi_shm_eager = zbmi_shm; 
    177     /* Eager section starts with a descriptor.  */ 
    178     eager_desc = (struct ZBMIEagerDesc*)zbmi_shm_eager; 
    179     /* The rendezvous buffers part is initialized and managed by us.  */ 
    180     zbmi_shm_rzv = zbmi_shm + zbmi_shm_size_eager; 
    181     zbmi_pool_init(zbmi_shm_rzv, zbmi_shm_size_total - zbmi_shm_size_eager); 
     154    zbmi_shm_unexp = zbmi_shm; 
     155    /* The expected buffers part is initialized and managed by us.  */ 
     156    zbmi_shm_exp = zbmi_shm + zbmi_shm_size_unexp; 
     157    zbmi_pool_init(zbmi_shm_exp, zbmi_shm_size_total - zbmi_shm_size_unexp); 
    182158 
    183159    if (!(error_ops = op_list_new())) 
     
    204180 
    205181    /* FIXME!  Send some sort of a FINI message first?  */ 
    206     for (i = 0; i < zbmi_queues_used; i++) 
    207     { 
    208         char queue_name[256]; 
    209         mq_close(zbmi_queues[i]); 
    210         sprintf(queue_name, ZBMI_REPLY_QUEUE_TEMPLATE, i); 
    211         mq_unlink(queue_name); 
    212     } 
    213     free(zbmi_queues_inuse); 
    214     free(zbmi_queues); 
    215     zbmi_queues_len = zbmi_queues_used = 0; 
    216     mq_close(zbmi_control_queue); 
     182    for (i = 0; i < zbmi_sockets_used; i++) 
     183        close(zbmi_sockets[i]); 
     184    free(zbmi_sockets_inuse); 
     185    free(zbmi_sockets); 
     186    zbmi_sockets_len = zbmi_sockets_used = 0; 
    217187 
    218188    return 0; 
     
    231201BMI_zoid_server_memfree(void* buffer) 
    232202{ 
    233     struct no_mem_descriptor* desc; 
     203    struct NoMemDescriptor* desc; 
    234204 
    235205    zbmi_pool_free(buffer); 
     
    237207    /* Once some memory has been freed, go over the queue of requests waiting 
    238208       for memory and try to satisfy any of them.  */ 
    239     gen_mutex_lock(&no_mem_queue_mutex); 
     209    pthread_mutex_lock(&no_mem_queue_mutex); 
    240210 
    241211    for (desc = no_mem_queue_first; desc;) 
    242212    { 
    243         struct no_mem_descriptor* desc_next = desc->next; 
     213        struct NoMemDescriptor* desc_next = desc->next; 
    244214        void* buf; 
    245215 
     
    248218            method_op_p op = desc->op; 
    249219 
    250             METHOD_DATA(op)->tmp_buffer = buf; 
     220            ((struct ZoidServerMethodData*)op->method_data)->tmp_buffer = buf; 
    251221 
    252222            if (op->send_recv == BMI_SEND) 
     
    275245            free(desc); 
    276246 
    277             /* Post it as "non-immediate" since we do not have facilities 
    278                here to manage immediate completions.  */ 
    279             if ((op->error_code = -send_post_cmd(op, 1, NULL))) 
    280             { 
    281                 gen_mutex_lock(&error_ops_mutex); 
     247            if ((op->error_code = -send_post_cmd(op))) 
     248            { 
     249                pthread_mutex_lock(&error_ops_mutex); 
    282250                op_list_add(error_ops, op); 
    283                 gen_mutex_unlock(&error_ops_mutex); 
     251                pthread_mutex_unlock(&error_ops_mutex); 
    284252            } 
    285253        } 
     
    288256    } 
    289257 
    290     gen_mutex_unlock(&no_mem_queue_mutex); 
     258    pthread_mutex_unlock(&no_mem_queue_mutex); 
    291259} 
    292260 
     
    295263BMI_zoid_server_unexpected_free(void* buffer) 
    296264{ 
    297     struct ZBMIControlEagerFreeCmd cmd; 
    298  
    299     if ((char*)buffer < zbmi_shm_eager || 
    300         (char*)buffer >= zbmi_shm_eager + zbmi_shm_size_eager) 
    301     { 
     265    int zoid_fd, zoid_release; 
     266    int hdr; 
     267    struct ZBMIControlUnexpFreeCmd cmd; 
     268 
     269    if (buffer < zbmi_shm_unexp || 
     270        buffer >= zbmi_shm_unexp + zbmi_shm_size_unexp) 
    302271        return -BMI_EINVAL; 
    303     } 
    304  
    305     cmd.command_id = ZBMI_CONTROL_EAGER_FREE; 
    306     cmd.unexpected = 1; 
    307     cmd.buffer = (char*)buffer - zbmi_shm_eager; 
    308  
    309     if (mq_send(zbmi_control_queue, (void*)&cmd, sizeof(cmd), 0) < 0) 
    310     { 
    311         perror("mq_send"); 
     272 
     273    if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 
     274        return zoid_fd; 
     275 
     276    hdr = ZBMI_CONTROL_UNEXP_FREE; 
     277    cmd.buffer = buffer - zbmi_shm_unexp; 
     278 
     279    if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr) || 
     280        socket_write(zoid_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) 
     281    { 
     282        perror("write"); 
     283        release_zoid_socket(zoid_release); 
    312284        return -BMI_EINVAL; 
    313285    } 
    314286 
     287    release_zoid_socket(zoid_release); 
    315288    return 0; 
    316289} 
     
    320293BMI_zoid_server_testunexpected(int incount, int* outcount, 
    321294                               struct bmi_method_unexpected_info* info, 
    322                                uint8_t class, int max_idle_time_ms) 
    323 { 
    324     mqd_t reply_queue; 
    325     int queue_id; 
     295                               int max_idle_time_ms) 
     296{ 
     297    int zoid_fd, zoid_release; 
     298    int hdr; 
    326299    int i; 
    327300    struct ZBMIControlUnexpTestCmd cmd; 
    328     char resp_buffer[ZBMI_MAX_MSG_SIZE]; 
    329     struct ZBMIControlUnexpTestResp* resp; 
    330     struct SharedMessageDescriptor* msg_desc; 
    331  
    332     /* Check for ready messages in the shared memory region first.  */ 
    333     sem_wait(&eager_desc->unexp_queue_sem); 
    334  
    335     *outcount = 0; 
    336     for (msg_desc = (struct SharedMessageDescriptor*) 
    337              (eager_desc->unexp_queue_first + zbmi_shm_eager); 
    338          msg_desc != (struct SharedMessageDescriptor*)zbmi_shm_eager; 
    339          msg_desc = (struct SharedMessageDescriptor*) 
    340              (msg_desc->next + zbmi_shm_eager)) 
    341     { 
    342         if (*outcount >= incount) 
    343             break; 
    344  
    345         if (msg_desc->message_type == MSG_TYPE_SEND && 
    346             msg_desc->class == class) 
    347         { 
    348             info[*outcount].error_code = 0; 
    349             if (!(info[*outcount].addr = get_client_addr(msg_desc->addr))) 
    350                 return -BMI_ENOMEM; 
    351             info[*outcount].buffer = msg_desc->buffer; 
    352             info[*outcount].size = msg_desc->size; 
    353             info[*outcount].tag = msg_desc->tag; 
    354  
    355             /* Prevent any subsequent tests from matching it.  */ 
    356             msg_desc->message_type = MSG_TYPE_DONE; 
    357  
    358             (*outcount)++; 
    359         } 
    360     } 
    361  
    362     sem_post(&eager_desc->unexp_queue_sem); 
    363  
    364     if (*outcount > 0 || max_idle_time_ms == 0) 
    365         return 0; 
    366  
    367     if ((reply_queue = get_zoid_reply_queue(&queue_id)) < 0) 
    368         return reply_queue; 
    369  
    370     cmd.command_id = ZBMI_CONTROL_UNEXP_TEST; 
    371     cmd.queue_id = queue_id; 
     301    struct ZBMIControlUnexpTestResp resp; 
     302    struct ZBMIControlBufDesc* buf_descs = NULL; 
     303 
     304    if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 
     305        return zoid_fd; 
     306 
     307    hdr = ZBMI_CONTROL_UNEXP_TEST; 
    372308    cmd.incount = incount; 
    373309    cmd.max_idle_time_ms = max_idle_time_ms; 
    374     cmd.class = class; 
    375  
    376     if (mq_send(zbmi_control_queue, (void*)&cmd, sizeof(cmd), 0) < 0) 
    377     { 
    378         perror("mq_send"); 
    379         release_zoid_reply_queue(queue_id); 
     310 
     311    if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr) || 
     312        socket_write(zoid_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) 
     313    { 
     314        perror("write"); 
     315        release_zoid_socket(zoid_release); 
    380316        return -BMI_EINVAL; 
    381317    } 
    382318 
    383     if (mq_receive(reply_queue, resp_buffer, sizeof(resp_buffer), NULL) < 0) 
    384     { 
    385         perror("mq_receive"); 
    386         release_zoid_reply_queue(queue_id); 
     319    if (socket_read(zoid_fd, &resp, offsetof(typeof(resp), buffers)) != 
     320        offsetof(typeof(resp), buffers)) 
     321    { 
     322        perror("read"); 
     323        release_zoid_socket(zoid_release); 
    387324        return -BMI_EINVAL; 
    388325    } 
    389  
    390     release_zoid_reply_queue(queue_id); 
    391  
    392     resp = (struct ZBMIControlUnexpTestResp*)resp_buffer; 
    393     *outcount = resp->outcount; 
    394     for (i = 0; i < resp->outcount; i++) 
    395     { 
    396         msg_desc = (struct SharedMessageDescriptor*) 
    397             (resp->descs[i] + zbmi_shm_eager); 
    398  
     326    if (resp.outcount_bytes > 0) 
     327    { 
     328        buf_descs = alloca(resp.outcount_bytes); 
     329 
     330        if (socket_read(zoid_fd, buf_descs, resp.outcount_bytes) != 
     331            resp.outcount_bytes) 
     332        { 
     333            perror("read"); 
     334            release_zoid_socket(zoid_release); 
     335            return -BMI_EINVAL; 
     336        } 
     337    } 
     338 
     339    release_zoid_socket(zoid_release); 
     340 
     341    *outcount = resp.outcount; 
     342    for (i = 0; i < resp.outcount; i++) 
     343    { 
    399344        info[i].error_code = 0; 
    400         if (!(info[i].addr = get_client_addr(msg_desc->addr))) 
     345 
     346        if (!(info[i].addr = get_client_addr(buf_descs->addr))) 
    401347            return -BMI_ENOMEM; 
    402         info[i].buffer = msg_desc->buffer; 
    403         info[i].size = msg_desc->size; 
    404         info[i].tag = msg_desc->tag; 
    405  
    406         /* msg_desc->message_type has already been set to MSG_TYPE_DONE 
    407            by the zbmi plugin.  */ 
     348 
     349        if (buf_descs->list_count != 1) 
     350            return -BMI_EINVAL; 
     351 
     352        info[i].buffer = zbmi_shm_unexp + buf_descs->list[0].buffer; 
     353        info[i].size = buf_descs->list[0].size; 
     354        info[i].tag = buf_descs->tag; 
     355 
     356        buf_descs = (struct ZBMIControlBufDesc*) 
     357            (((char*)buf_descs) + offsetof(typeof(*buf_descs), list) + 
     358             buf_descs->list_count * sizeof(buf_descs->list[0])); 
    408359    } 
    409360 
     
    421372{ 
    422373    method_op_p new_op; 
    423     int ret; 
    424  
    425     if (!(new_op = bmi_alloc_method_op(sizeof(struct zoid_server_method_data)))) 
     374 
     375    /* Server-side sends are never immediate, so we start by allocating a 
     376       method op.  */ 
     377 
     378    if (!(new_op = bmi_alloc_method_op(sizeof(struct ZoidServerMethodData)))) 
    426379        return -BMI_ENOMEM; 
    427380    *id = new_op->op_id; 
     
    445398    } 
    446399    new_op->error_code = 0; 
    447     METHOD_DATA(new_op)->tmp_buffer = NULL; 
    448     METHOD_DATA(new_op)->zoid_buf_id = 0; 
     400    ((struct ZoidServerMethodData*)new_op->method_data)->tmp_buffer = NULL; 
     401    ((struct ZoidServerMethodData*)new_op->method_data)->zoid_buf_id = 0; 
    449402 
    450403    if (buffer_type == BMI_EXT_ALLOC) 
     
    462415        } 
    463416 
    464         METHOD_DATA(new_op)->tmp_buffer = buf; 
     417        ((struct ZoidServerMethodData*)new_op->method_data)->tmp_buffer = buf; 
    465418 
    466419        for (i = 0; i < list_count; i++) 
     
    475428        int i; 
    476429        for (i = 0; i < list_count; i++) 
    477             if ((char*)buffer_list[i] < zbmi_shm_rzv || 
    478                 (char*)buffer_list[i] + size_list[i] > zbmi_shm_rzv + 
    479                 zbmi_shm_size_total - zbmi_shm_size_eager) 
     430            if (buffer_list[i] < zbmi_shm_exp || 
     431                buffer_list[i] + size_list[i] > zbmi_shm_exp + 
     432                zbmi_shm_size_total - zbmi_shm_size_unexp) 
    480433            { 
    481434                return -BMI_EINVAL; 
     
    483436    } 
    484437 
    485     if ((ret = send_post_cmd(new_op, 0, NULL)) == 1) 
    486     { 
    487         /* Immediate completion.  */ 
    488         if (buffer_type == BMI_EXT_ALLOC) 
    489             BMI_zoid_server_memfree(METHOD_DATA(new_op)->tmp_buffer); 
    490  
    491         bmi_dealloc_method_op(new_op); 
    492     } 
    493  
    494     return ret; 
     438    return send_post_cmd(new_op); 
    495439} 
    496440 
     
    506450{ 
    507451    method_op_p new_op; 
    508     int ret, length; 
    509     struct SharedMessageDescriptor* msg_desc; 
    510  
    511     /* Check for matching eager messages first.  */ 
    512  
    513     sem_wait(&eager_desc->eager_queue_sem); 
    514  
    515     for (msg_desc = (struct SharedMessageDescriptor*) 
    516              (eager_desc->eager_queue_first + zbmi_shm_eager); 
    517          msg_desc != (struct SharedMessageDescriptor*)zbmi_shm_eager; 
    518          msg_desc = (struct SharedMessageDescriptor*) 
    519              (msg_desc->next + zbmi_shm_eager)) 
    520     { 
    521         if (msg_desc->message_type == MSG_TYPE_SEND && 
    522             msg_desc->addr == ((struct zoid_addr*)src->method_data)->pid && 
    523             msg_desc->tag == tag) 
    524         { 
    525             break; 
    526         } 
    527     } 
    528  
    529     if (msg_desc != (struct SharedMessageDescriptor*)zbmi_shm_eager) 
    530     { 
    531         int i; 
    532         size_t copied; 
    533         struct ZBMIControlEagerFreeCmd cmd; 
    534  
    535         /* We found a matching eager message.  All we need to do is copy 
    536            the memory over.  */ 
    537  
    538         if (total_expected_size < msg_desc->size) 
    539         { 
    540             fprintf(stderr, "Message size mismatch, ION " 
    541                     "expected size %lld < CN total size %d\n", 
    542                     total_expected_size, msg_desc->size); 
    543  
    544             sem_post(&eager_desc->eager_queue_sem); 
    545             return -BMI_EINVAL; 
    546         } 
    547  
    548         copied = 0; 
    549         i = 0; 
    550         while (copied < msg_desc->size) 
    551         { 
    552             size_t tocopy = (size_list[i] < msg_desc->size - copied ? 
    553                              size_list[i] : msg_desc->size - copied); 
    554             memcpy(buffer_list[i], msg_desc->buffer + copied, tocopy); 
    555             copied += tocopy; 
    556             i++; 
    557         } 
    558         *total_actual_size = msg_desc->size; 
    559  
    560         /* Prevent any subsequent matches.  */ 
    561         msg_desc->message_type = MSG_TYPE_DONE; 
    562  
    563         sem_post(&eager_desc->eager_queue_sem); 
    564  
    565         /* Request its release (only the zbmi plugin can do it).  */ 
    566         cmd.command_id = ZBMI_CONTROL_EAGER_FREE; 
    567         cmd.unexpected = 0; 
    568         cmd.buffer = (char*)msg_desc - zbmi_shm_eager; 
    569  
    570         if (mq_send(zbmi_control_queue, (void*)&cmd, sizeof(cmd), 0) < 0) 
    571         { 
    572             perror("mq_send"); 
    573             return -BMI_EINVAL; 
    574         } 
    575  
    576         /* Immediate completion.  */ 
    577         return 1; 
    578     } 
    579  
    580     sem_post(&eager_desc->eager_queue_sem); 
    581  
    582     if (!(new_op = bmi_alloc_method_op(sizeof(struct zoid_server_method_data)))) 
     452 
     453    /* Server-side receives are never immediate, so we start by allocating a 
     454       method op.  */ 
     455 
     456    if (!(new_op = bmi_alloc_method_op(sizeof(struct ZoidServerMethodData)))) 
    583457        return -BMI_ENOMEM; 
    584458    *id = new_op->op_id; 
     
    602476    } 
    603477    new_op->error_code = 0; 
    604     METHOD_DATA(new_op)->tmp_buffer = NULL; 
    605     METHOD_DATA(new_op)->zoid_buf_id = 0; 
     478    ((struct ZoidServerMethodData*)new_op->method_data)->tmp_buffer = NULL; 
     479    ((struct ZoidServerMethodData*)new_op->method_data)->zoid_buf_id = 0; 
    606480 
    607481    if (buffer_type == BMI_EXT_ALLOC) 
     
    617491        } 
    618492 
    619         METHOD_DATA(new_op)->tmp_buffer = buf; 
     493        ((struct ZoidServerMethodData*)new_op->method_data)->tmp_buffer = buf; 
    620494    } 
    621495    else 
     
    624498        int i; 
    625499        for (i = 0; i < list_count; i++) 
    626             if ((char*)buffer_list[i] < zbmi_shm_rzv || 
    627                 (char*)buffer_list[i] + size_list[i] > zbmi_shm_rzv + 
    628                 zbmi_shm_size_total - zbmi_shm_size_eager) 
     500            if (buffer_list[i] < zbmi_shm_exp || 
     501                buffer_list[i] + size_list[i] > zbmi_shm_exp + 
     502                zbmi_shm_size_total - zbmi_shm_size_unexp) 
    629503            { 
    630504                return -BMI_EINVAL; 
     
    632506    } 
    633507 
    634     if ((ret = send_post_cmd(new_op, 0, &length)) == 1) 
    635     { 
    636         /* Immediate completion.  */ 
    637         *total_actual_size = length; 
    638  
    639         if (buffer_type == BMI_EXT_ALLOC) 
    640         { 
    641             /* Copy the memory back to the user buffer(s).  */ 
    642             int j, size_remaining = length; 
    643             void *buf_cur = METHOD_DATA(new_op)->tmp_buffer; 
    644             j = 0; 
    645             while (size_remaining > 0) 
    646             { 
    647                 int tocopy = (new_op->size_list[j] < size_remaining ? 
    648                               new_op->size_list[j] : size_remaining); 
    649  
    650                 memcpy(new_op->buffer_list[j], buf_cur, tocopy); 
    651                 buf_cur += tocopy; 
    652                 size_remaining -= tocopy; 
    653                 j++; 
    654             } 
    655  
    656             BMI_zoid_server_memfree(METHOD_DATA(new_op)->tmp_buffer); 
    657         } 
    658  
    659         bmi_dealloc_method_op(new_op); 
    660     } 
    661  
    662     return ret; 
     508    return send_post_cmd(new_op); 
    663509} 
    664510 
     
    673519                        bmi_context_id context_id) 
    674520{ 
    675     mqd_t reply_queue; 
    676     int queue_id; 
    677     char cmd_buffer[ZBMI_MAX_MSG_SIZE], resp_buffer[ZBMI_MAX_MSG_SIZE]; 
     521    int zoid_fd, zoid_release; 
     522    int hdr; 
    678523    struct ZBMIControlTestCmd* cmd; 
    679524    int cmd_len; 
    680     struct ZBMIControlTestResp* resp; 
    681     int i, index; 
     525    struct ZBMIControlTestResp resp; 
     526    int i; 
    682527    int outcount_used = 0; /* Counter of already used output entries.  */ 
    683528    int incount_fwd = incount; /* Counter of how many input entries to 
     
    686531    /* We start by checking if there are any local failed/canceled operations 
    687532       and taking care of those first.  */ 
    688     gen_mutex_lock(&error_ops_mutex); 
     533    pthread_mutex_lock(&error_ops_mutex); 
    689534    if (incount) 
    690535    { 
     
    709554 
    710555                op_list_remove(op); 
    711                 bmi_dealloc_method_op(op); 
    712             } 
     556                /* Note: we will dealloc a little later.  */ 
     557            } 
     558        } 
     559 
     560        if (outcount_used > 0) 
     561        { 
     562            incount_fwd = incount - outcount_used; 
     563            if (index_array) 
     564                index_array += outcount_used; 
    713565        } 
    714566    } 
     
    733585            bmi_dealloc_method_op(op); 
    734586        } 
    735     } 
    736     gen_mutex_unlock(&error_ops_mutex); 
    737  
    738     if (outcount_used > 0) 
    739     { 
    740         /* Return immediately if there is something to return.  */ 
    741         *outcount = outcount_used; 
    742         return 0; 
    743     } 
    744 #if 0 /* Commented out so that testcontext won't return 0 if there are ready 
    745          expected rendezvous messages.  */ 
    746     else if (!incount) 
    747     { 
    748         /* Testcontext.  Check in the shared memory region for ready 
    749            unexpected messages and return immediately if any.  */ 
    750         struct SharedMessageDescriptor* msg_desc; 
    751  
    752         sem_wait(&eager_desc->unexp_queue_sem); 
    753  
    754         for (msg_desc = (struct SharedMessageDescriptor*) 
    755                  (eager_desc->unexp_queue_first + zbmi_shm_eager); 
    756              msg_desc != (struct SharedMessageDescriptor*)zbmi_shm_eager; 
    757              msg_desc = (struct SharedMessageDescriptor*) 
    758                  (msg_desc->next + zbmi_shm_eager)) 
    759         { 
    760             if (msg_desc->message_type == MSG_TYPE_SEND) 
    761                 break; 
    762         } 
    763  
    764         sem_post(&eager_desc->unexp_queue_sem); 
    765  
    766         if (msg_desc != (struct SharedMessageDescriptor*)zbmi_shm_eager) 
    767         { 
    768             *outcount = 0; 
    769             return 0; 
    770         } 
    771     } 
    772 #endif 
    773  
    774     if ((reply_queue = get_zoid_reply_queue(&queue_id)) < 0) 
    775         return reply_queue; 
    776  
     587 
     588        if (outcount_used > 0) 
     589        { 
     590            id_array += outcount_used; 
     591            error_code_array += outcount_used; 
     592            actual_size_array += outcount_used; 
     593            user_ptr_array += outcount_used; 
     594        } 
     595    } 
     596    pthread_mutex_unlock(&error_ops_mutex); 
     597 
     598    hdr = ZBMI_CONTROL_TEST; 
    777599    cmd_len = offsetof(typeof(*cmd), zoid_ids) + 
    778600        incount_fwd * sizeof(cmd->zoid_ids[0]); 
    779     if (cmd_len > ZBMI_MAX_MSG_SIZE) 
    780         return -BMI_EINVAL; 
    781  
    782     cmd = (struct ZBMIControlTestCmd*)cmd_buffer; 
    783     cmd->command_id = ZBMI_CONTROL_TEST; 
    784     cmd->queue_id = queue_id; 
     601    cmd = alloca(cmd_len); 
     602 
    785603    cmd->timeout_ms = max_idle_time_ms; 
     604 
    786605    /* incount_fwd == 0 indicates "testcontext".  We still need to communicate 
    787606       the max. count of outputs we are prepared to handle.  */ 
     
    790609    { 
    791610        method_op_p op = (method_op_p)id_gen_fast_lookup(id_array[i]); 
    792  
    793         cmd->zoid_ids[i] = METHOD_DATA(op)->zoid_buf_id; 
    794     } 
    795  
    796     if (mq_send(zbmi_control_queue, (void*)cmd, cmd_len, 0) < 0) 
    797     { 
    798         perror("mq_send"); 
    799         release_zoid_reply_queue(queue_id); 
     611        if (op->error_code) 
     612        { 
     613            bmi_dealloc_method_op(op); 
     614            continue; 
     615        } 
     616        cmd->zoid_ids[i] = ((struct ZoidServerMethodData*)op->method_data)-> 
     617            zoid_buf_id; 
     618    } 
     619 
     620    if (outcount_used > 0) 
     621    { 
     622        outcount_max -= outcount_used; 
     623        if (outcount_max == 0 || (incount > 0 && incount_fwd == 0)) 
     624        { 
     625            *outcount = outcount_used; 
     626            return 0; 
     627        } 
     628    } 
     629 
     630    /* Note: this is shifted later than usual in the function body so that 
     631       we can invoke bmi_dealloc_method_op above as appropriate.  */ 
     632    if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 
     633        return zoid_fd; 
     634 
     635    if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr) || 
     636        socket_write(zoid_fd, cmd, cmd_len) != cmd_len) 
     637    { 
     638        perror("write"); 
     639        release_zoid_socket(zoid_release); 
    800640        return -BMI_EINVAL; 
    801641    } 
    802642 
    803     if (mq_receive(reply_queue, resp_buffer, sizeof(resp_buffer), NULL) < 0) 
    804     { 
    805         perror("mq_receive"); 
    806         release_zoid_reply_queue(queue_id); 
     643    if (socket_read(zoid_fd, &resp, offsetof(typeof(resp), list)) != 
     644        offsetof(typeof(resp), list)) 
     645    { 
     646        perror("read"); 
     647        release_zoid_socket(zoid_release); 
    807648        return -BMI_EINVAL; 
    808649    } 
    809     resp = (struct ZBMIControlTestResp*)resp_buffer; 
    810  
    811     assert(resp->count <= outcount_max); 
    812     *outcount = resp->count; 
    813  
    814     for (i = 0, index = 0; i < resp->count; i++, index++) 
    815     { 
    816         method_op_p op = (method_op_p)id_gen_fast_lookup(resp->list[i]. 
    817                                                          bmi_id); 
    818         if (incount_fwd) 
    819         { 
    820             for (; index < incount_fwd; index++) 
    821             { 
    822                 if (cmd->zoid_ids[index] == METHOD_DATA(op)->zoid_buf_id) 
    823                     break; 
    824             } 
    825             assert(index < incount_fwd); 
    826  
    827             if (index_array) 
    828                 index_array[i] = index; 
     650    assert(resp.count <= outcount_max); 
     651    *outcount = resp.count; 
     652    if (resp.count > 0) 
     653    { 
     654        struct ZBMIControlTestRespList* resp_list; 
     655        int index; 
     656 
     657        resp_list = alloca(resp.count * sizeof(*resp_list)); 
     658 
     659        if (socket_read(zoid_fd, resp_list, resp.count * sizeof(*resp_list)) != 
     660            resp.count * sizeof(*resp_list)) 
     661        { 
     662            perror("read"); 
     663            release_zoid_socket(zoid_release); 
     664            return -BMI_EINVAL; 
     665        } 
     666 
     667        for (i = 0, index = 0; i < resp.count; i++, index++) 
     668        { 
     669            method_op_p op = (method_op_p)id_gen_fast_lookup(resp_list[i]. 
     670                                                             bmi_id); 
     671            if (incount_fwd) 
     672            { 
     673                for (; index < incount_fwd; index++) 
     674                { 
     675                    if (cmd->zoid_ids[index] == ((struct ZoidServerMethodData*) 
     676                                                 op->method_data)->zoid_buf_id) 
     677                        break; 
     678                } 
     679                assert(index < incount_fwd); 
     680 
     681                if (index_array) 
     682                    index_array[i] = index; 
     683                else 
     684                    assert(i == index); 
     685            } 
     686            else /* testcontext */ 
     687                id_array[i] = resp_list[i].bmi_id; 
     688 
     689            if (resp_list[i].length < 0) /* Most likely BMI_ECANCEL */ 
     690            { 
     691                error_code_array[index] = -resp_list[i].length; 
     692                actual_size_array[index] = 0; 
     693            } 
    829694            else 
    830                 assert(i == index); 
    831         } 
    832         else /* testcontext */ 
    833         { 
    834             /* Make sure the returned method_op is fully initialized. 
    835                Only an issue for testcontext, since other test calls 
    836                require bmi_op_id_t, which implies full initialization.  */ 
    837             gen_mutex_lock(&METHOD_DATA(op)->post_mutex); 
    838             gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 
    839             id_array[i] = resp->list[i].bmi_id; 
    840         } 
    841  
    842         if (resp->list[i].length < 0) /* Most likely BMI_ECANCEL */ 
    843         { 
    844             error_code_array[index] = -resp->list[i].length; 
    845             actual_size_array[index] = 0; 
    846         } 
    847         else 
    848         { 
    849             actual_size_array[index] = resp->list[i].length; 
    850             error_code_array[index] = 0; 
    851         } 
    852  
    853         if (user_ptr_array) 
    854             user_ptr_array[index] = op->user_ptr; 
    855  
    856         /* We are done with this message.  Clean up.  */ 
    857         if (METHOD_DATA(op)->tmp_buffer) 
    858         { 
    859             if (op->send_recv == BMI_RECV) 
    860             { 
    861                 /* Copy the memory back to the user buffer(s).  */ 
    862                 int j, size_remaining = resp->list[i].length; 
    863                 void *buf_cur = METHOD_DATA(op)->tmp_buffer; 
    864                 j = 0; 
    865                 while (size_remaining > 0) 
     695            { 
     696                actual_size_array[index] = resp_list[i].length; 
     697                error_code_array[index] = 0; 
     698            } 
     699 
     700            if (user_ptr_array) 
     701                user_ptr_array[index] = op->user_ptr; 
     702 
     703            /* We are done with this message.  Clean up.  */ 
     704            if (((struct ZoidServerMethodData*)op->method_data)->tmp_buffer) 
     705            { 
     706                if (op->send_recv == BMI_RECV) 
    866707                { 
    867                     int tocopy = (op->size_list[j] < size_remaining ? 
    868                                   op->size_list[j] : size_remaining); 
    869  
    870                     memcpy(op->buffer_list[j], buf_cur, tocopy); 
    871                     buf_cur += tocopy; 
    872                     size_remaining -= tocopy; 
    873                     j++; 
     708                    /* Copy the memory back to the user buffer(s).  */ 
     709                    int j, size_remaining = resp_list[i].length; 
     710                    void *buf_cur = ((struct ZoidServerMethodData*)op-> 
     711                                     method_data)->tmp_buffer; 
     712                    j = 0; 
     713                    while (size_remaining > 0) 
     714                    { 
     715                        int tocopy = (op->size_list[j] < size_remaining ? 
     716                                      op->size_list[j] : size_remaining); 
     717 
     718                        memcpy(op->buffer_list[j], buf_cur, tocopy); 
     719                        buf_cur += tocopy; 
     720                        size_remaining -= tocopy; 
     721                        j++; 
     722                    } 
    874723                } 
    875             } 
    876  
    877             BMI_zoid_server_memfree(METHOD_DATA(op)->tmp_buffer); 
    878         } 
    879  
    880         bmi_dealloc_method_op(op); 
    881     } /* for (i) */ 
    882  
    883     release_zoid_reply_queue(queue_id); 
     724 
     725                BMI_zoid_server_memfree(((struct ZoidServerMethodData*)op-> 
     726                                         method_data)->tmp_buffer); 
     727            } 
     728 
     729            bmi_dealloc_method_op(op); 
     730        } /* for (i) */ 
     731    } /* if (resp.count > 0) */ 
     732 
     733    release_zoid_socket(zoid_release); 
    884734 
    885735    return 0; 
     
    890740BMI_zoid_server_cancel(bmi_op_id_t id, bmi_context_id context_id) 
    891741{ 
     742    int zoid_fd, zoid_release; 
     743    int hdr; 
    892744    struct ZBMIControlCancelCmd cmd; 
    893745    method_op_p op; 
     
    899751       have not, most likely because of a lack of memory (those can be handled 
    900752       locally).  */ 
    901     if (!METHOD_DATA(op)->zoid_buf_id) 
    902     { 
    903         gen_mutex_lock(&no_mem_queue_mutex); 
     753    if (!((struct ZoidServerMethodData*)op->method_data)->zoid_buf_id) 
     754    { 
     755        pthread_mutex_lock(&no_mem_queue_mutex); 
    904756 
    905757        /* Test again, now with mutex properly locked.  */ 
    906         if (!METHOD_DATA(op)->zoid_buf_id) 
     758        if (!((struct ZoidServerMethodData*)op->method_data)->zoid_buf_id) 
    907759        { 
    908760            if (!op->error_code) 
    909761            { 
    910762                /* It must be an out-of-memory case on no_mem_queue.  */ 
    911                 struct no_mem_descriptor* desc; 
     763                struct NoMemDescriptor* desc; 
    912764 
    913765                op->error_code = BMI_ECANCEL; 
     
    929781                assert(desc); 
    930782 
    931                 gen_mutex_lock(&error_ops_mutex); 
     783                pthread_mutex_lock(&error_ops_mutex); 
    932784                op_list_add(error_ops, op); 
    933                 gen_mutex_unlock(&error_ops_mutex); 
    934             } 
    935  
    936             gen_mutex_unlock(&no_mem_queue_mutex); 
     785                pthread_mutex_unlock(&error_ops_mutex); 
     786            } 
     787 
     788            pthread_mutex_unlock(&no_mem_queue_mutex); 
    937789 
    938790            return 0; 
    939791        } 
    940792 
    941         gen_mutex_unlock(&no_mem_queue_mutex); 
    942     } 
    943  
    944     cmd.command_id = ZBMI_CONTROL_CANCEL; 
    945     cmd.zoid_id = METHOD_DATA(op)->zoid_buf_id; 
    946  
    947     if (mq_send(zbmi_control_queue, (void*)&cmd, sizeof(cmd), 0) < 0) 
    948     { 
    949         perror("mq_send"); 
     793        pthread_mutex_unlock(&no_mem_queue_mutex); 
     794    } 
     795 
     796    if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 
     797        return zoid_fd; 
     798 
     799    hdr = ZBMI_CONTROL_CANCEL; 
     800    cmd.zoid_id = ((struct ZoidServerMethodData*)op->method_data)-> 
     801            zoid_buf_id; 
     802 
     803    if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr) || 
     804        socket_write(zoid_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) 
     805    { 
     806        perror("write"); 
     807        release_zoid_socket(zoid_release); 
    950808        return -BMI_EINVAL; 
    951809    } 
     
    954812} 
    955813 
    956 /* An internal routine used to obtain a (reply) queue to the ZBMI plugin.  */ 
    957 static mqd_t 
    958 get_zoid_reply_queue(int* queue_id) 
     814/* 
     815 * A more robust version of read(2). 
     816 */ 
     817static ssize_t 
     818socket_read(int fd, void* buf, size_t count) 
     819{ 
     820    size_t already_read = 0; 
     821 
     822    while (already_read < count) 
     823    { 
     824        ssize_t n; 
     825 
     826        n = read(fd, buf + already_read, count - already_read); 
     827 
     828        if (n == -1) 
     829        { 
     830            if (errno == EINTR || errno == EAGAIN) 
     831                continue; 
     832            return -1; 
     833        } 
     834        else if (n == 0) 
     835            return already_read; 
     836        else 
     837            already_read += n; 
     838    } 
     839 
     840    return already_read; 
     841} 
     842 
     843/* 
     844 * A more robust version of write(2). 
     845 */ 
     846static ssize_t 
     847socket_write(int fd, const void* buf, size_t count) 
     848{ 
     849    size_t already_written = 0; 
     850 
     851    while (already_written < count) 
     852    { 
     853        ssize_t n; 
     854 
     855        n = write(fd, buf + already_written, count - already_written); 
     856 
     857        if (n == -1) 
     858        { 
     859            if (errno == EINTR || errno == EAGAIN) 
     860                continue; 
     861            return -1; 
     862        } 
     863        else 
     864            already_written += n; 
     865    } 
     866 
     867    return already_written; 
     868} 
     869 
     870/* An internal routine used to obtain a socket to the ZBMI plugin.  */ 
     871static int 
     872get_zoid_socket(int* release_token) 
    959873{ 
    960874    int i; 
    961875 
    962     gen_mutex_lock(&zbmi_queues_mutex); 
    963  
    964     for (i = 0; i < zbmi_queues_used; i++) 
    965         if (!zbmi_queues_inuse[i]) 
     876    pthread_mutex_lock(&zbmi_sockets_mutex); 
     877 
     878    for (i = 0; i < zbmi_sockets_used; i++) 
     879        if (!zbmi_sockets_inuse[i]) 
    966880            break; 
    967881 
    968     if (i == zbmi_queues_used) 
    969     { 
    970         char queue_name[256]; 
    971         struct mq_attr attr; 
    972  
    973         /* All open queues are currently in use.  Open a new one.  */ 
    974         if (zbmi_queues_used == zbmi_queues_len) 
     882    if (i == zbmi_sockets_used) 
     883    { 
     884        /* All open sockets are currently in use.  Open a new one.  */ 
     885        struct sockaddr_un addr; 
     886 
     887        if (zbmi_sockets_used == zbmi_sockets_len) 
    975888        { 
    976889            /* Enlarge the arrays first.  */ 
    977890            int j; 
    978             int* zbmi_queues_new; 
    979             char* zbmi_queues_inuse_new; 
    980  
    981             if (zbmi_queues_len == 0) 
    982                 zbmi_queues_len = ZBMI_QUEUES_LEN_INIT; 
     891            int* zbmi_sockets_new; 
     892            char* zbmi_sockets_inuse_new; 
     893 
     894            if (zbmi_sockets_len == 0) 
     895                zbmi_sockets_len = ZBMI_SOCKETS_LEN_INIT; 
    983896            else 
    984                 zbmi_queues_len *= 2; 
    985             zbmi_queues_new = realloc(zbmi_queues, zbmi_queues_len * 
    986                                       sizeof(*zbmi_queues)); 
    987             if (!zbmi_queues_new) 
    988             { 
    989                 gen_mutex_unlock(&zbmi_queues_mutex); 
     897                zbmi_sockets_len *= 2; 
     898            zbmi_sockets_new = realloc(zbmi_sockets, zbmi_sockets_len * 
     899                                       sizeof(*zbmi_sockets)); 
     900            if (!zbmi_sockets_new) 
     901            { 
     902                pthread_mutex_unlock(&zbmi_sockets_mutex); 
    990903                return -BMI_ENOMEM; 
    991904            } 
    992             zbmi_queues = zbmi_queues_new; 
    993             zbmi_queues_inuse_new = realloc(zbmi_queues_inuse, 
    994                                             zbmi_queues_len * 
    995                                             sizeof(*zbmi_queues_inuse)); 
    996             if (!zbmi_queues_inuse_new) 
    997             { 
    998                 gen_mutex_unlock(&zbmi_queues_mutex); 
     905            zbmi_sockets = zbmi_sockets_new; 
     906            zbmi_sockets_inuse_new = realloc(zbmi_sockets_inuse, 
     907                                             zbmi_sockets_len * 
     908                                             sizeof(*zbmi_sockets_inuse)); 
     909            if (!zbmi_sockets_inuse_new) 
     910            { 
     911                pthread_mutex_unlock(&zbmi_sockets_mutex); 
    999912                return -BMI_ENOMEM; 
    1000913            } 
    1001             zbmi_queues_inuse = zbmi_queues_inuse_new; 
    1002  
    1003             for (j = zbmi_queues_used; j < zbmi_queues_len; j++) 
    1004                 zbmi_queues_inuse[j] = 0; 
    1005         } 
    1006  
    1007         sprintf(queue_name, ZBMI_REPLY_QUEUE_TEMPLATE, i); 
    1008         attr.mq_flags = 0; 
    1009         attr.mq_maxmsg = 2; /* We never put more than one in there.  */ 
    1010         attr.mq_msgsize = ZBMI_MAX_MSG_SIZE; 
    1011         attr.mq_curmsgs = 0; 
    1012  
    1013         zbmi_queues[i] = mq_open(queue_name, O_RDONLY | O_CREAT | O_EXCL, 
    1014                                  0666, &attr); 
    1015         if (zbmi_queues[i] < 0) 
    1016         { 
    1017             if (errno == EEXIST) 
    1018             { 
    1019                 /* There's no such thing as O_TRUNC for message queues, so 
    1020                    we need to do it manually.  */ 
    1021                 mq_unlink(queue_name); 
    1022                 zbmi_queues[i] = mq_open(queue_name, 
    1023                                          O_RDONLY | O_CREAT | O_EXCL, 
    1024                                          0666, &attr); 
    1025             } 
    1026         } 
    1027  
    1028         if (zbmi_queues[i] < 0) 
    1029         { 
    1030             perror("ZBMI reply queue"); 
    1031             gen_mutex_unlock(&zbmi_queues_mutex); 
     914            zbmi_sockets_inuse = zbmi_sockets_inuse_new; 
     915 
     916            for (j = zbmi_sockets_used; j < zbmi_sockets_len; j++) 
     917                zbmi_sockets_inuse[j] = 0; 
     918        } 
     919 
     920        if ((zbmi_sockets[i] = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) 
     921        { 
     922            perror("ZBMI control socket"); 
     923            pthread_mutex_unlock(&zbmi_sockets_mutex); 
    1032924            return -BMI_EINVAL; 
    1033925        } 
    1034926 
    1035         zbmi_queues_used++; 
    1036     } 
    1037  
    1038     zbmi_queues_inuse[i] = 1; 
    1039  
    1040     gen_mutex_unlock(&zbmi_queues_mutex); 
    1041  
    1042     *queue_id = i; 
    1043  
    1044     return zbmi_queues[i]; 
    1045 } 
    1046  
    1047 /* Releases the queue obtained with get_zoid_reply_queue.  */ 
     927        addr.sun_family = AF_UNIX; 
     928        strcpy(addr.sun_path, ZBMI_SOCKET_NAME); 
     929#if 0 
     930        if (bind(zbmi_sockets[i], (struct sockaddr*)&addr, sizeof(addr)) < 0) 
     931        { 
     932            perror("bind " ZBMI_SOCKET_NAME); 
     933            close(zbmi_sockets[i]); 
     934            pthread_mutex_unlock(&zbmi_sockets_mutex); 
     935            return -BMI_EINVAL; 
     936        } 
     937#endif 
     938        while (connect(zbmi_sockets[i], (struct sockaddr*)&addr, sizeof(addr)) 
     939               < 0) 
     940        { 
     941            if (errno == ENOENT || errno == ECONNREFUSED) 
     942            { 
     943                /* ZOID server not running yet or too many requests?  Wait 
     944                   a little...  */ 
     945                sleep(1); 
     946            } 
     947            else 
     948            { 
     949                perror("connect to ZOID"); 
     950                close(zbmi_sockets[i]); 
     951                pthread_mutex_unlock(&zbmi_sockets_mutex); 
     952                return -BMI_EINVAL; 
     953            } 
     954        } 
     955 
     956        zbmi_sockets_used++; 
     957    } 
     958 
     959    zbmi_sockets_inuse[i] = 1; 
     960 
     961    pthread_mutex_unlock(&zbmi_sockets_mutex); 
     962 
     963    *release_token = i; 
     964 
     965    return zbmi_sockets[i]; 
     966} 
     967 
     968/* Releases the socket obtained with get_zoid_socket.  */ 
    1048969static void 
    1049 release_zoid_reply_queue(int queue_id) 
    1050 { 
    1051     assert(queue_id >= 0 && queue_id < zbmi_queues_used); 
    1052  
    1053     gen_mutex_lock(&zbmi_queues_mutex); 
    1054  
    1055     assert(zbmi_queues_inuse[queue_id]); 
    1056     zbmi_queues_inuse[queue_id] = 0; 
    1057  
    1058     gen_mutex_unlock(&zbmi_queues_mutex); 
     970release_zoid_socket(int release_token) 
     971{ 
     972    assert(release_token >= 0 && release_token < zbmi_sockets_used); 
     973 
     974    pthread_mutex_lock(&zbmi_sockets_mutex); 
     975 
     976    assert(zbmi_sockets_inuse[release_token]); 
     977    zbmi_sockets_inuse[release_token] = 0; 
     978 
     979    pthread_mutex_unlock(&zbmi_sockets_mutex); 
    1059980} 
    1060981 
     
    1068989    assert(zoid_addr >= 0); 
    1069990 
    1070     gen_mutex_lock(&clients_mutex); 
     991    pthread_mutex_lock(&clients_mutex); 
    1071992 
    1072993    if (zoid_addr >= clients_len) 
     
    10811002                                         sizeof(*clients_addr)))) 
    10821003        { 
    1083             gen_mutex_unlock(&clients_mutex); 
     1004            pthread_mutex_unlock(&clients_mutex); 
    10841005            return NULL; 
    10851006        } 
     
    11051026    ret = clients_addr[zoid_addr]; 
    11061027 
    1107     gen_mutex_unlock(&clients_mutex); 
     1028    pthread_mutex_unlock(&clients_mutex); 
    11081029 
    11091030    return ret; 
     
    11141035zoid_server_free_client_addr(bmi_method_addr_p addr) 
    11151036{ 
    1116     gen_mutex_lock(&clients_mutex); 
     1037    pthread_mutex_lock(&clients_mutex); 
    11171038 
    11181039    assert(((struct zoid_addr*)addr->method_data)->pid < clients_len && 
     
    11201041    clients_addr[((struct zoid_addr*)addr->method_data)->pid] = NULL; 
    11211042 
    1122     gen_mutex_unlock(&clients_mutex); 
     1043    pthread_mutex_unlock(&clients_mutex); 
    11231044} 
    11241045 
     
    11271048enqueue_no_mem(method_op_p op, bmi_size_t total_size) 
    11281049{ 
    1129     struct no_mem_descriptor *nomemdesc, *desc; 
     1050    struct NoMemDescriptor *nomemdesc, *desc; 
    11301051 
    11311052    if (!(nomemdesc = malloc(sizeof(*nomemdesc)))) 
     
    11371058    /* no_mem_queue is sorted in descending size order. 
    11381059       Look for an appropriate spot to insert a new entry.  */ 
    1139     gen_mutex_lock(&no_mem_queue_mutex); 
     1060    pthread_mutex_lock(&no_mem_queue_mutex); 
    11401061 
    11411062    for (desc = no_mem_queue_first; desc; desc = desc->next) 
     
    11791100    } 
    11801101 
    1181     gen_mutex_unlock(&no_mem_queue_mutex); 
     1102    pthread_mutex_unlock(&no_mem_queue_mutex); 
    11821103 
    11831104    return 0; 
    11841105} 
    11851106 
    1186 /* A common internal posting routine for send and receive requests. 
    1187    "not_immediate" is used for messages triggered from memfree; it is 
    1188    inconvenient at that point for messages to succeed immediately (we 
    1189    would need a separate queue for them). 
    1190    The function returns 0 if posted successfully, 1 for immediate 
    1191    completion, and a negative value if failed. 
    1192    For immediate completions of receives, the length of the received 
    1193    message is stored in *length; 
    1194 */ 
     1107/* A common internal posting routine for send and receive requests.  */ 
    11951108static int 
    1196 send_post_cmd(method_op_p op, int not_immediate, int* length) 
    1197 { 
    1198     mqd_t reply_queue; 
    1199     int queue_id; 
     1109send_post_cmd(method_op_p op) 
     1110{ 
     1111    int zoid_fd, zoid_release; 
    12001112    int cmd_len, i; 
    1201     char cmd_buffer[ZBMI_MAX_MSG_SIZE]; 
     1113    int hdr; 
    12021114    struct ZBMIControlPostCmd* cmd; 
    12031115    struct ZBMIControlPostResp resp; 
    12041116    int list_count_zoid; 
    12051117 
    1206     if ((reply_queue = get_zoid_reply_queue(&queue_id)) < 0) 
    1207         return reply_queue; 
    1208  
    1209     list_count_zoid = (METHOD_DATA(op)->tmp_buffer ? 1 : op->list_count); 
     1118    if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 
     1119        return zoid_fd; 
     1120 
     1121    hdr = (op->send_recv == BMI_SEND ? ZBMI_CONTROL_POST_SEND : 
     1122           ZBMI_CONTROL_POST_RECV); 
     1123    list_count_zoid = (((struct ZoidServerMethodData*)op->method_data)-> 
     1124                       tmp_buffer ? 1 : op->list_count); 
    12101125    cmd_len = offsetof(typeof(*cmd), buf.list) + 
    12111126        list_count_zoid * sizeof(cmd->buf.list[0]); 
    1212     if (cmd_len > ZBMI_MAX_MSG_SIZE) 
    1213         return -BMI_EINVAL; 
    1214  
    1215     cmd = (struct ZBMIControlPostCmd*)cmd_buffer; 
    1216     cmd->command_id = (op->send_recv == BMI_SEND ? ZBMI_CONTROL_POST_SEND : 
    1217            ZBMI_CONTROL_POST_RECV); 
    1218     cmd->queue_id = queue_id; 
    1219     cmd->not_immediate = not_immediate; 
     1127    cmd = alloca(cmd_len); 
    12201128    cmd->bmi_id = op->op_id; 
    12211129    cmd->buf.addr = ((struct zoid_addr*)op->addr->method_data)->pid; 
    12221130    cmd->buf.tag = op->msg_tag; 
    12231131    cmd->buf.list_count = list_count_zoid; 
    1224     if (METHOD_DATA(op)->tmp_buffer) 
    1225     { 
    1226         cmd->buf.list[0].buffer = (char*)METHOD_DATA(op)->tmp_buffer - 
    1227             zbmi_shm_rzv; 
     1132    if (((struct ZoidServerMethodData*)op->method_data)->tmp_buffer) 
     1133    { 
     1134        cmd->buf.list[0].buffer = ((struct ZoidServerMethodData*)op-> 
     1135                                   method_data)->tmp_buffer - zbmi_shm_exp; 
    12281136        cmd->buf.list[0].size = (op->send_recv == BMI_SEND ? op->actual_size : 
    12291137                                 op->expected_size); 
     
    12321140        for (i = 0; i < op->list_count; i++) 
    12331141        { 
    1234             cmd->buf.list[i].buffer = (char*)op->buffer_list[i] - zbmi_shm_rzv; 
     1142            cmd->buf.list[i].buffer = op->buffer_list[i] - zbmi_shm_exp; 
    12351143            cmd->buf.list[i].size = op->size_list[i]; 
    12361144        } 
    12371145 
    1238     /* The moment the command is written below, we can get a completion of 
    1239        this request on another thread that is waiting in testcontext.  That 
    1240        thread will release method_op_p, resulting in us accessing free memory. 
    1241        This mutex prevents that.  */ 
    1242     gen_mutex_init(&METHOD_DATA(op)->post_mutex); 
    1243     gen_mutex_lock(&METHOD_DATA(op)->post_mutex); 
    1244  
    1245     if (mq_send(zbmi_control_queue, (void*)cmd, cmd_len, 0) < 0) 
    1246     { 
    1247         perror("mq_send"); 
    1248         gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 
    1249         release_zoid_reply_queue(queue_id); 
     1146    if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr) || 
     1147        socket_write(zoid_fd, cmd, cmd_len) != cmd_len) 
     1148    { 
     1149        perror("write"); 
     1150        release_zoid_socket(zoid_release); 
    12501151        return -BMI_EINVAL; 
    12511152    } 
    12521153 
    1253     if (mq_receive(reply_queue, (void*)&resp, ZBMI_MAX_MSG_SIZE, NULL) != 
    1254         sizeof(resp)) 
    1255     { 
    1256         perror("mq_receive"); 
    1257         gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 
    1258         release_zoid_reply_queue(queue_id); 
     1154    if (socket_read(zoid_fd, &resp, sizeof(resp)) != sizeof(resp)) 
     1155    { 
     1156        perror("read"); 
     1157        release_zoid_socket(zoid_release); 
    12591158        return -BMI_EINVAL; 
    12601159    } 
    12611160 
    1262     release_zoid_reply_queue(queue_id); 
     1161    release_zoid_socket(zoid_release); 
    12631162 
    12641163    if (!resp.zoid_id) 
    1265     { 
    1266         gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 
    12671164        return -BMI_ENOMEM; 
    1268     } 
    1269     else if (resp.zoid_id == -1) 
    1270     { 
    1271         /* Immediate completion.  */ 
    1272         assert(!not_immediate); 
    1273         gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 
    1274         if (length) 
    1275             *length = resp.length; 
    1276         return 1; 
    1277     } 
    1278  
    1279     METHOD_DATA(op)->zoid_buf_id = resp.zoid_id; 
    1280  
    1281     gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 
     1165 
     1166    ((struct ZoidServerMethodData*)op->method_data)->zoid_buf_id = resp.zoid_id; 
    12821167 
    12831168    return 0; 
  • branches/cu-security-branch/src/io/bmi/bmi_zoid/zoid.c

    r8359 r8397  
    4242static op_list_p zoid_ops; 
    4343 
    44 /* As a special feature, client-side ZOID method allows post timeouts. 
    45    If non-zero this enables combined post & test in order to reduce the 
    46    number of round-trip messages to the server.  */ 
    47 static int post_timeout = 0; 
    48  
    4944 
    5045static int zoid_err_to_bmi(int err); 
     
    5651                      const bmi_size_t* size_list, int list_count, 
    5752                      bmi_size_t total_size, enum bmi_buffer_type buffer_type, 
    58                       bmi_msg_tag_t tag, uint8_t class, void* user_ptr, 
     53                      bmi_msg_tag_t tag, void* user_ptr, 
    5954                      bmi_context_id context_id, PVFS_hint hints, 
    6055                      int unexpected) 
     
    10095        } 
    10196 
    102         if (total_size <= ZOID_MAX_EAGER_MSG) 
    103             ret = zbmi_send_eager((const void**)buffer_list, size_list_cp, 
    104                                   list_count, tag, class, unexpected, 
    105                                   post_timeout); 
    106         else 
    107             ret = zbmi_send(buffer_list, size_list_cp, list_count, tag, class, 
    108                             unexpected, post_timeout); 
    109         err = __zoid_error(); 
    110         if (total_size <= ZOID_MAX_EAGER_MSG ? 
    111             (!err && ret == 0) : (err == ENOMEM)) 
     97        ret = zbmi_send(buffer_list, size_list_cp, list_count, tag, unexpected); 
     98        if ((err = __zoid_error())) 
    11299        { 
    113             /* Indicates that there was no memory on the server side 
    114                for the message.  Could happen if this is an expected 
    115                message and no matching receive has been posted, or if 
    116                we sent too many unexpected messages without the server-side 
    117                receiving anything.  */ 
    118  
    119             if (!(new_op = bmi_alloc_method_op(0))) 
    120                 return -BMI_ENOMEM; 
    121             *id = new_op->op_id; 
    122             new_op->addr = dest; 
    123             new_op->send_recv = BMI_SEND; 
    124             new_op->user_ptr = user_ptr; 
    125             new_op->msg_tag = tag; 
    126             new_op->class = class; 
    127             new_op->list_count = list_count; 
    128             new_op->actual_size = total_size; 
    129             if (list_count == 1) 
     100            if (err == ENOMEM) 
    130101            { 
    131                 /* Our buffer_list and size_list pointers might be 
    132                    temporary (see, e.g., BMI_zoid_post_send), so we 
    133                    prefer to copy the data over to someplace more 
    134                    permanent.  */ 
    135                 new_op->buffer = (void*)buffer_list[0]; 
    136                 new_op->buffer_list = &new_op->buffer; 
    137                 new_op->size_list = &new_op->actual_size; 
     102                /* Indicates that there was no memory on the server side 
     103                   for the message.  Could happen if this is an expected 
     104                   message and no matching receive has been posted, or if 
     105                   we sent too many unexpected messages without the server-side 
     106                   receiving anything.  */ 
     107 
     108                if (!(new_op = bmi_alloc_method_op(0))) 
     109                    return -BMI_ENOMEM; 
     110                *id = new_op->op_id; 
     111                new_op->addr = dest; 
     112                new_op->send_recv = BMI_SEND; 
     113                new_op->user_ptr = user_ptr; 
     114                new_op->msg_tag = tag; 
     115                new_op->list_count = list_count; 
     116                new_op->actual_size = total_size; 
     117                if (list_count == 1) 
     118                { 
     119                    /* Our buffer_list and size_list pointers might be 
     120                       temporary (see, e.g., BMI_zoid_post_send), so we 
     121                       prefer to copy the data over to someplace more 
     122                       permanent.  */ 
     123                    new_op->buffer = (void*)buffer_list[0]; 
     124                    new_op->buffer_list = &new_op->buffer; 
     125                    new_op->size_list = &new_op->actual_size; 
     126                } 
     127                else 
     128                { 
     129                    new_op->buffer_list = (void*const*)buffer_list; 
     130                    new_op->size_list = size_list; 
     131                } 
     132                new_op->error_code = 0; 
     133                new_op->method_data = (void*)unexpected; 
     134 
     135                op_list_add(zoid_ops, new_op); 
     136 
     137                return 0; /* Non-immediate completion.  */ 
    138138            } 
    139139            else 
    140             { 
    141                 new_op->buffer_list = (void*const*)buffer_list; 
    142                 new_op->size_list = size_list; 
    143             } 
    144             new_op->error_code = 0; 
    145             new_op->method_data = (void*)unexpected; 
    146  
    147             op_list_add(zoid_ops, new_op); 
    148  
    149             return 0; /* Non-immediate completion.  */ 
     140                return zoid_err_to_bmi(err); 
    150141        } 
    151         else if (err) 
    152             return zoid_err_to_bmi(err); 
     142 
     143        assert (ret == 1); 
    153144 
    154145        return 1; /* Immediate completion.  */ 
     
    201192        } 
    202193 
    203         ret = zbmi_recv(buffer_list, size_list_cp, list_count, tag, 
    204                         post_timeout); 
     194        ret = zbmi_recv(buffer_list, size_list_cp, list_count, tag); 
    205195        if ((err = __zoid_error())) 
    206196            return zoid_err_to_bmi(err); 
     
    231221        new_op->user_ptr = user_ptr; 
    232222        new_op->msg_tag = tag; 
    233         new_op->class = 0; 
    234223        new_op->list_count = list_count; 
    235224        new_op->expected_size = total_expected_size; 
     
    307296            incount_fwd++; 
    308297        } 
    309  
    310         /* If we have canceled messages, then we don't need to wait.  We 
    311            still check with the server to find about ready messages, but 
    312            that's it.  */ 
    313         if (canceled > 0) 
    314             max_idle_time_ms = 0; 
    315298 
    316299        ret = zbmi_test(tags, ops, unexp_sizes, incount_fwd, ready, 
     
    357340                    ret = zbmi_send((const void*const*)op->buffer_list, 
    358341                                    size_list_cp, op->list_count, op->msg_tag, 
    359                                     op->class, (int)op->method_data, 0); 
     342                                    (int)op->method_data); 
    360343                    if ((err = __zoid_error())) 
    361344                    { 
     
    384367 
    385368                    ret = zbmi_recv(op->buffer_list, size_list_cp, 
    386                                     op->list_count, op->msg_tag, 0); 
     369                                    op->list_count, op->msg_tag); 
    387370                    if ((err = __zoid_error())) 
    388371                        return zoid_err_to_bmi(err); 
     
    500483            bmi_dealloc_method_addr(inout_parameter); 
    501484            break; 
    502  
    503         case BMI_ZOID_POST_TIMEOUT: 
    504             if (zoid_node_type == CLIENT) 
    505                 post_timeout = *(int*)inout_parameter; 
    506             break; 
    507485    } 
    508486 
     
    573551    else 
    574552        return BMI_zoid_server_unexpected_free(buffer); 
     553} 
     554 
     555/* Invoked on BMI_post_send.  */ 
     556static int 
     557BMI_zoid_post_send(bmi_op_id_t* id, bmi_method_addr_p dest, 
     558                   const void* buffer, bmi_size_t size, 
     559                   enum bmi_buffer_type buffer_type, bmi_msg_tag_t tag, 
     560                   void* user_ptr, bmi_context_id context_id, PVFS_hint hints) 
     561{ 
     562    return zoid_post_send_common(id, dest, &buffer, &size, 1, size, buffer_type, 
     563                                 tag, user_ptr, context_id, hints, 0); 
     564} 
     565 
     566/* Invoked on BMI_post_sendunexpected.  We only support it on clients.  */ 
     567static int 
     568BMI_zoid_post_sendunexpected(bmi_op_id_t* id, bmi_method_addr_p dest, 
     569                             const void* buffer, bmi_size_t size, 
     570                             enum bmi_buffer_type buffer_type, 
     571                             bmi_msg_tag_t tag, void* user_ptr, 
     572                             bmi_context_id context_id, PVFS_hint hints) 
     573{ 
     574    return zoid_post_send_common(id, dest, &buffer, &size, 1, size, buffer_type, 
     575                                 tag, user_ptr, context_id, hints, 1); 
     576} 
     577 
     578/* Invoked on BMI_post_recv.  */ 
     579static int 
     580BMI_zoid_post_recv(bmi_op_id_t* id, bmi_method_addr_p src, void* buffer, 
     581                   bmi_size_t expected_size, bmi_size_t* actual_size, 
     582                   enum bmi_buffer_type buffer_type, bmi_msg_tag_t tag, 
     583                   void* user_ptr, bmi_context_id context_id, PVFS_hint hints) 
     584{ 
     585    return zoid_post_recv_common(id, src, &buffer, &expected_size, 1, 
     586                                 expected_size, actual_size, buffer_type, tag, 
     587                                 user_ptr, context_id, hints); 
    575588} 
    576589 
     
    645658} 
    646659 
    647 /* Invoked on BMI_testunexpected_class.  We only support it on the server.  */ 
     660/* Invoked on BMI_testunexpected.  We only support in on the server.  */ 
    648661static int 
    649662BMI_zoid_testunexpected(int incount, int* outcount, 
    650663                        struct bmi_method_unexpected_info* info, 
    651                         uint8_t class, int max_idle_time_ms) 
     664                        int max_idle_time_ms) 
    652665{ 
    653666    if (zoid_node_type == CLIENT) 
     
    656669    /* Server code.  */ 
    657670 
    658     return BMI_zoid_server_testunexpected(incount, outcount, info, class, 
     671    return BMI_zoid_server_testunexpected(incount, outcount, info, 
    659672                                          max_idle_time_ms); 
    660673} 
     
    696709{ 
    697710    return zoid_post_send_common(id, dest, buffer_list, size_list, list_count, 
    698                                  total_size, buffer_type, tag, 0, user_ptr, 
     711                                 total_size, buffer_type, tag, user_ptr, 
    699712                                 context_id, hints, 0); 
    700713} 
     
    715728} 
    716729 
    717 /* Invoked on BMI_post_sendunexpected_list_class.  We only support it on 
    718    clients.  */ 
     730/* Invoked on BMI_post_sendunexpected_list.  We only support it on clients.  */ 
    719731static int 
    720732BMI_zoid_post_sendunexpected_list(bmi_op_id_t* id, bmi_method_addr_p dest, 
     
    723735                                  bmi_size_t total_size, 
    724736                                  enum bmi_buffer_type buffer_type, 
    725                                   bmi_msg_tag_t tag, uint8_t class, 
    726                                   void* user_ptr, bmi_context_id context_id, 
    727                                   PVFS_hint hints) 
     737                                  bmi_msg_tag_t tag, void* user_ptr, 
     738                                  bmi_context_id context_id, PVFS_hint hints) 
    728739{ 
    729740    return zoid_post_send_common(id, dest, buffer_list, size_list, list_count, 
    730                                  total_size, buffer_type, tag, class, user_ptr, 
     741                                 total_size, buffer_type, tag, user_ptr, 
    731742                                 context_id, hints, 1); 
    732743} 
     
    793804    .unexpected_free = BMI_zoid_unexpected_free, 
    794805 
     806    .post_send = BMI_zoid_post_send, 
     807    .post_sendunexpected = BMI_zoid_post_sendunexpected, 
     808    .post_recv = BMI_zoid_post_recv, 
     809 
    795810    .test = BMI_zoid_test, 
    796811    .testsome = BMI_zoid_testsome, 
  • branches/cu-security-branch/src/io/bmi/bmi_zoid/zoid.h

    r8359 r8397  
    44#define ZOID_MAX_EXPECTED_MSG (128 * 1024 * 1024) 
    55#define ZOID_MAX_UNEXPECTED_MSG 8192 
    6 #define ZOID_MAX_EAGER_MSG 1024 
    7  
    86 
    97#define ZOID_ADDR_SERVER_PID -1 
     
    2725int BMI_zoid_server_testunexpected(int incount, int* outcount, 
    2826                                   struct bmi_method_unexpected_info* info, 
    29                                    uint8_t class, int max_idle_time_ms); 
     27                                   int max_idle_time_ms); 
    3028int zoid_server_send_common(bmi_op_id_t* id, bmi_method_addr_p dest, 
    3129                            const void*const* buffer_list,