Changeset 8397 for branches/cu-security-branch/src/io/bmi
- Timestamp:
- 06/18/10 20:02:50 (3 years ago)
- Location:
- branches/cu-security-branch
- Files:
-
- 23 modified
-
. (modified) (1 prop)
-
src/io/bmi (modified) (1 prop)
-
src/io/bmi/bmi-method-callback.h (modified) (1 diff)
-
src/io/bmi/bmi-method-support.h (modified) (2 diffs)
-
src/io/bmi/bmi.c (modified) (17 diffs)
-
src/io/bmi/bmi_gm (modified) (1 prop)
-
src/io/bmi/bmi_gm/bmi-gm.c (modified) (1 diff)
-
src/io/bmi/bmi_ib (modified) (1 prop)
-
src/io/bmi/bmi_ib/ib.c (modified) (8 diffs)
-
src/io/bmi/bmi_mx (modified) (1 prop)
-
src/io/bmi/bmi_mx/README (modified) (3 diffs)
-
src/io/bmi/bmi_mx/mx.c (modified) (62 diffs)
-
src/io/bmi/bmi_mx/mx.h (modified) (4 diffs)
-
src/io/bmi/bmi_portals (modified) (1 prop)
-
src/io/bmi/bmi_portals/portals.c (modified) (1 diff)
-
src/io/bmi/bmi_tcp (modified) (1 prop)
-
src/io/bmi/bmi_tcp/bmi-tcp.c (modified) (1 diff)
-
src/io/bmi/bmi_tcp/socket-collection-epoll.c (modified) (1 diff)
-
src/io/bmi/bmi_tcp/socket-collection.c (modified) (1 diff)
-
src/io/bmi/bmi_zoid/README (modified) (11 diffs)
-
src/io/bmi/bmi_zoid/server.c (modified) (41 diffs)
-
src/io/bmi/bmi_zoid/zoid.c (modified) (16 diffs)
-
src/io/bmi/bmi_zoid/zoid.h (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
branches/cu-security-branch
- Property svn:ignore
-
old new 3 3 aclocal.m4 4 4 autom4te.cache 5 config.status6 Makefile7 pvfs2-config.h8 module.mk
-
- Property svn:ignore
-
branches/cu-security-branch/src/io/bmi
- Property svn:ignore deleted
-
branches/cu-security-branch/src/io/bmi/bmi-method-callback.h
r8330 r8397 12 12 BMI_addr_t bmi_method_addr_reg_callback(bmi_method_addr_p map); 13 13 int bmi_method_addr_forget_callback(BMI_addr_t addr); 14 void bmi_method_addr_drop_callback(char *method_name); 14 15 15 16 #endif /* __BMI_METHOD_CALLBACK_H */ -
branches/cu-security-branch/src/io/bmi/bmi-method-support.h
r7941 r8397 59 59 }; 60 60 61 /* flags that can be set per method to affect behavior */ 62 #define BMI_METHOD_FLAG_NO_POLLING 1 63 61 64 /* This is the table of interface functions that must be provided by BMI 62 65 * methods. … … 65 68 { 66 69 const char *method_name; 70 int flags; 67 71 int (*initialize) (bmi_method_addr_p, int, int); 68 72 int (*finalize) (void); -
branches/cu-security-branch/src/io/bmi/bmi.c
r8330 r8397 50 50 struct qlist_head link; 51 51 BMI_addr_t addr; 52 }; 53 54 /* 55 * BMI trigger to reap all method resources for inactive addresses. 56 */ 57 static QLIST_HEAD(bmi_addr_force_drop_list); 58 static gen_mutex_t bmi_addr_force_drop_list_mutex = GEN_MUTEX_INITIALIZER; 59 struct drop_item 60 { 61 struct qlist_head link; 62 char *method_name; 52 63 }; 53 64 … … 70 81 #ifdef __STATIC_METHOD_BMI_PORTALS__ 71 82 extern struct bmi_method_ops bmi_portals_ops; 83 #endif 84 #ifdef __STATIC_METHOD_BMI_ZOID__ 85 extern struct bmi_method_ops bmi_zoid_ops; 72 86 #endif 73 87 … … 88 102 &bmi_portals_ops, 89 103 #endif 104 #ifdef __STATIC_METHOD_BMI_ZOID__ 105 &bmi_zoid_ops, 106 #endif 90 107 NULL 91 108 }; … … 110 127 111 128 static struct bmi_method_ops **active_method_table = NULL; 112 static struct { 129 130 struct method_usage_t { 113 131 int iters_polled; /* how many iterations since this method was polled */ 114 132 int iters_active; /* how many iterations since this method had action */ 115 133 int plan; 116 } *method_usage = NULL; 134 int flags; 135 }; 136 137 static struct method_usage_t * expected_method_usage = NULL; 138 static struct method_usage_t * unexpected_method_usage = NULL; 139 117 140 static const int usage_iters_starvation = 100000; 118 141 static const int usage_iters_active = 10000; … … 122 145 int flags); 123 146 static void bmi_addr_drop(ref_st_p tmp_ref); 147 static void bmi_addr_force_drop(ref_st_p ref, ref_list_p ref_list); 124 148 static void bmi_check_forget_list(void); 149 static void bmi_check_addr_force_drop (void); 125 150 126 151 /** Initializes the BMI layer. Must be called before any other BMI … … 518 543 known_method_count = 0; 519 544 520 if (method_usage) 521 free(method_usage); 545 if (expected_method_usage) 546 free(expected_method_usage); 547 548 if (unexpected_method_usage) 549 free(unexpected_method_usage); 522 550 523 551 /* destroy the reference list */ … … 901 929 */ 902 930 static void 903 construct_poll_plan(int nmeth, int *idle_time_ms) 931 construct_poll_plan(struct method_usage_t * method_usage, 932 int nmeth, int *idle_time_ms) 904 933 { 905 934 int i, numplan; … … 910 939 ++method_usage[i].iters_active; 911 940 method_usage[i].plan = 0; 912 if (method_usage[i].iters_active <= usage_iters_active) { 941 if ((method_usage[i].iters_active <= usage_iters_active) && 942 (!(method_usage[i].flags & BMI_METHOD_FLAG_NO_POLLING))){ 913 943 /* recently busy, poll */ 914 944 if (0) gossip_debug(GOSSIP_BMI_DEBUG_CONTROL, … … 967 997 /* figure out if we need to drop any stale addresses */ 968 998 bmi_check_forget_list(); 999 bmi_check_addr_force_drop(); 969 1000 970 1001 gen_mutex_lock(&active_method_count_mutex); … … 977 1008 *outcount = 0; 978 1009 979 construct_poll_plan(tmp_active_method_count, &max_idle_time_ms); 1010 construct_poll_plan(unexpected_method_usage, 1011 tmp_active_method_count, &max_idle_time_ms); 980 1012 981 1013 while (position < incount && i < tmp_active_method_count) 982 1014 { 983 if ( method_usage[i].plan) {1015 if (unexpected_method_usage[i].plan) { 984 1016 ret = active_method_table[i]->testunexpected( 985 1017 (incount - position), &tmp_outcount, … … 993 1025 position += tmp_outcount; 994 1026 (*outcount) += tmp_outcount; 995 method_usage[i].iters_polled = 0;1027 unexpected_method_usage[i].iters_polled = 0; 996 1028 if (ret) 997 method_usage[i].iters_active = 0;1029 unexpected_method_usage[i].iters_active = 0; 998 1030 } 999 1031 i++; … … 1074 1106 } 1075 1107 1076 construct_poll_plan(tmp_active_method_count, &max_idle_time_ms); 1108 construct_poll_plan(expected_method_usage, 1109 tmp_active_method_count, &max_idle_time_ms); 1077 1110 1078 1111 while (position < incount && i < tmp_active_method_count) 1079 1112 { 1080 if ( method_usage[i].plan) {1113 if (expected_method_usage[i].plan) { 1081 1114 ret = active_method_table[i]->testcontext( 1082 1115 incount - position, … … 1096 1129 position += tmp_outcount; 1097 1130 (*outcount) += tmp_outcount; 1098 method_usage[i].iters_polled = 0;1131 expected_method_usage[i].iters_polled = 0; 1099 1132 if (ret) 1100 method_usage[i].iters_active = 0;1133 expected_method_usage[i].iters_active = 0; 1101 1134 } 1102 1135 i++; … … 1202 1235 new_buffer = tmp_ref->interface->memalloc(size, send_recv); 1203 1236 1237 /* initialize buffer, if not NULL. */ 1238 if (new_buffer) 1239 { 1240 memset(new_buffer,0,size); 1241 } 1204 1242 return (new_buffer); 1205 1243 } … … 1981 2019 1982 2020 /* 2021 * Signal BMI to drop inactive connections for this method. 2022 */ 2023 void bmi_method_addr_drop_callback (char* method_name) 2024 { 2025 struct drop_item *item = 2026 (struct drop_item *) malloc(sizeof(struct drop_item)); 2027 2028 /* 2029 * If we can't allocate, just return. 2030 * Maybe this will succeed next time. 2031 */ 2032 if (!item) return; 2033 2034 item->method_name = method_name; 2035 2036 gen_mutex_lock(&bmi_addr_force_drop_list_mutex); 2037 qlist_add(&item->link, &bmi_addr_force_drop_list); 2038 gen_mutex_unlock(&bmi_addr_force_drop_list_mutex); 2039 2040 return; 2041 } 2042 2043 2044 /** 2045 * Try to increase method_usage_t struct to include room for a new method. 2046 */ 2047 static int grow_method_usage (struct method_usage_t ** p, int newflags) 2048 { 2049 struct method_usage_t * x = *p; 2050 *p = malloc((active_method_count + 1) * sizeof(**p)); 2051 if (!*p) { 2052 *p = x; 2053 return 0; 2054 } 2055 if (active_method_count) { 2056 memcpy(*p, x, active_method_count * sizeof(**p)); 2057 free(x); 2058 } 2059 memset(&((*p)[active_method_count]), 0, sizeof(**p)); 2060 (*p)[active_method_count].flags = newflags; 2061 2062 return 1; 2063 } 2064 2065 /* 1983 2066 * Attempt to insert this name into the list of active methods, 1984 2067 * and bring it up. … … 2030 2113 active_method_table[active_method_count] = meth; 2031 2114 2032 x = method_usage; 2033 method_usage = malloc((active_method_count + 1) * sizeof(*method_usage)); 2034 if (!method_usage) { 2035 method_usage = x; 2036 return -ENOMEM; 2037 } 2038 if (active_method_count) { 2039 memcpy(method_usage, x, active_method_count * sizeof(*method_usage)); 2040 free(x); 2041 } 2042 memset(&method_usage[active_method_count], 0, sizeof(*method_usage)); 2115 if (!grow_method_usage (&unexpected_method_usage, meth->flags)) 2116 return -ENOMEM; 2117 2118 /** 2119 * If we run out of memory here, the unexpected_method_usage will be 2120 * larger than strictly required but there is no memory leak. 2121 */ 2122 2123 if (!grow_method_usage (&expected_method_usage, meth->flags)) 2124 return -ENOMEM; 2043 2125 2044 2126 ++active_method_count; … … 2220 2302 } 2221 2303 2304 2305 /* bmi_addr_force_drop 2306 * 2307 * Destroys a complete BMI address, including forcing the method to clean up 2308 * its portion. 2309 * 2310 * NOTE: must be called with ref list mutex held 2311 */ 2312 static void bmi_addr_force_drop(ref_st_p ref, ref_list_p ref_list) 2313 { 2314 gossip_debug(GOSSIP_BMI_DEBUG_CONTROL, 2315 "[BMI CONTROL]: %s: bmi discarding address: %llu\n", 2316 __func__, llu(ref->bmi_addr)); 2317 2318 ref_list_rem(ref_list, ref->bmi_addr); 2319 dealloc_ref_st(ref); 2320 2321 return; 2322 } 2323 2324 /* 2325 * bmi_check_addr_force_drop 2326 * 2327 * Checks to see if any method has requested freeing resources. 2328 */ 2329 static void bmi_check_addr_force_drop (void) 2330 { 2331 struct drop_item *drop_item = NULL; 2332 ref_st_p ref_item = NULL; 2333 2334 gen_mutex_lock(&bmi_addr_force_drop_list_mutex); 2335 while (!qlist_empty(&bmi_addr_force_drop_list)) 2336 { 2337 drop_item = qlist_entry(qlist_pop(&bmi_addr_force_drop_list), 2338 struct drop_item, 2339 link); 2340 gen_mutex_unlock(&bmi_addr_force_drop_list_mutex); 2341 gen_mutex_lock(&ref_mutex); 2342 qlist_for_each_entry(ref_item, cur_ref_list, list_link) 2343 { 2344 if ((ref_item->ref_count == 0) && 2345 (ref_item->interface->method_name == drop_item->method_name)) 2346 { 2347 bmi_addr_force_drop(ref_item, cur_ref_list); 2348 } 2349 } 2350 gen_mutex_unlock(&ref_mutex); 2351 gen_mutex_lock(&bmi_addr_force_drop_list_mutex); 2352 } 2353 gen_mutex_unlock(&bmi_addr_force_drop_list_mutex); 2354 2355 return; 2356 } 2357 2222 2358 /* 2223 2359 * Local variables: -
branches/cu-security-branch/src/io/bmi/bmi_gm
- Property svn:ignore deleted
-
branches/cu-security-branch/src/io/bmi/bmi_gm/bmi-gm.c
r7941 r8397 158 158 const struct bmi_method_ops bmi_gm_ops = { 159 159 .method_name = BMI_gm_method_name, 160 .flags = 0, 160 161 .initialize = BMI_gm_initialize, 161 162 .finalize = BMI_gm_finalize, -
branches/cu-security-branch/src/io/bmi/bmi_ib
- Property svn:ignore deleted
-
branches/cu-security-branch/src/io/bmi/bmi_ib/ib.c
r7941 r8397 23 23 #include <src/common/gen-locks/gen-locks.h> /* gen_mutex_t ... */ 24 24 #include <src/common/misc/pvfs2-internal.h> 25 #include "pint-hint.h" 25 26 26 27 #ifdef HAVE_VALGRIND_H … … 895 896 const void *buffer, bmi_size_t total_size, 896 897 enum bmi_buffer_type buffer_flag __unused, 897 bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id) 898 bmi_msg_tag_t tag, void *user_ptr, bmi_context_id 899 context_id, PVFS_hint hints __unused) 898 900 { 899 901 return post_send(id, remote_map, 0, &buffer, &total_size, … … 905 907 const void *const *buffers, const bmi_size_t *sizes, int list_count, 906 908 bmi_size_t total_size, enum bmi_buffer_type buffer_flag __unused, 907 bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id) 909 bmi_msg_tag_t tag, void *user_ptr, bmi_context_id context_id, PVFS_hint 910 hints __unused) 908 911 { 909 912 return post_send(id, remote_map, list_count, buffers, sizes, … … 916 919 enum bmi_buffer_type buffer_flag __unused, 917 920 bmi_msg_tag_t tag, void *user_ptr, 918 bmi_context_id context_id) 921 bmi_context_id context_id, PVFS_hint hints 922 __unused) 919 923 { 920 924 return post_send(id, remote_map, 0, &buffer, &total_size, … … 929 933 enum bmi_buffer_type buffer_flag __unused, 930 934 bmi_msg_tag_t tag, void *user_ptr, 931 bmi_context_id context_id) 935 bmi_context_id context_id, PVFS_hint hints 936 __unused) 932 937 { 933 938 return post_send(id, remote_map, list_count, buffers, sizes, … … 1066 1071 void *buffer, bmi_size_t expected_len, bmi_size_t *actual_len __unused, 1067 1072 enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr, 1068 bmi_context_id context_id )1073 bmi_context_id context_id, PVFS_hint hints __unused) 1069 1074 { 1070 1075 return post_recv(id, remote_map, 0, &buffer, &expected_len, … … 1077 1082 bmi_size_t tot_expected_len, bmi_size_t *tot_actual_len __unused, 1078 1083 enum bmi_buffer_type buffer_flag __unused, bmi_msg_tag_t tag, void *user_ptr, 1079 bmi_context_id context_id )1084 bmi_context_id context_id, PVFS_hint hints __unused) 1080 1085 { 1081 1086 return post_recv(id, remote_map, list_count, buffers, sizes, … … 2109 2114 { 2110 2115 .method_name = "bmi_ib", 2116 .flags = 0, 2111 2117 .initialize = BMI_ib_initialize, 2112 2118 .finalize = BMI_ib_finalize, -
branches/cu-security-branch/src/io/bmi/bmi_mx
- Property svn:ignore deleted
-
branches/cu-security-branch/src/io/bmi/bmi_mx/README
r7024 r8397 68 68 BMX_DEBUG Turn on gossip messages 69 69 BMX_MEM_ACCT Track memory usage 70 BMX_LOGGING Turn on MPE logging71 70 BMX_SERVER_RXS Additional rxs for servers 72 71 BMX_TIMEOUT Timeout for all MX messages … … 117 116 BMI_mx_method_addr_lookup(). 118 117 119 BMX_LOGGING120 This is not generally recommended. It turns on support for MPE logging but121 it requires modifying the Makefile.in script and re-generating configure.122 Contact help <at> myri.com for assistance.123 124 118 BMX_SERVER_RXS 125 119 The server will receive messages from unknown peers. This value determines how … … 156 150 mx://hostname:ep_id 157 151 158 Use the first option if mx_info lists hostname:board and use the second option if mx_info simply shows a hostname. 152 Use the first option if mx_info lists hostname:board and use the second option 153 if mx_info simply shows a hostname. 159 154 160 155 ====================== -
branches/cu-security-branch/src/io/bmi/bmi_mx/mx.c
r7941 r8397 17 17 struct bmx_data *bmi_mx = NULL; /* global state for bmi_mx */ 18 18 19 mx_status_t BMX_NO_STATUS; 20 19 21 #if BMX_MEM_ACCT 20 22 uint64_t mem_used = 0; /* bytes used */ … … 22 24 #endif 23 25 24 #if BMX_LOGGING25 int send_start;26 int send_finish;27 int recv_start;28 int recv_finish;29 int sendunex_start;30 int sendunex_finish;31 int recvunex_start;32 int recvunex_finish;33 #endif34 35 26 /* statics for event logging */ 36 27 static PINT_event_type bmi_mx_send_event_id; … … 48 39 static void 49 40 bmx_create_peername(void); 41 42 /**** Completion function token handling ****************************/ 43 /* We should not hold any locks when calling mx_test[_any](), 44 * mx_wait_any() or mx_cancel(). We want to avoid races between them, 45 * however. So, before calling any completion function, the caller 46 * must hold this token. These functions implement a token system (i.e. 47 * semaphore) that will wake up mx_wait_any() to reduce blocking times 48 * for the calling function. 49 */ 50 51 static void 52 bmx_get_completion_token(void) 53 { 54 int done = 0; 55 56 do { 57 gen_mutex_lock(&bmi_mx->bmx_completion_lock); 58 if (bmi_mx->bmx_refcount == 1) { 59 bmi_mx->bmx_refcount--; 60 done = 1; 61 gen_mutex_unlock(&bmi_mx->bmx_completion_lock); 62 } else { 63 assert(bmi_mx->bmx_refcount == 0); 64 /* someone has the lock, wake the MX endpoint in 65 * case they are blocking in mx_wait_any() */ 66 gen_mutex_unlock(&bmi_mx->bmx_completion_lock); 67 mx_wakeup(bmi_mx->bmx_ep); 68 } 69 } while (!done); 70 71 return; 72 } 73 74 static void 75 bmx_release_completion_token(void) 76 { 77 gen_mutex_lock(&bmi_mx->bmx_completion_lock); 78 bmi_mx->bmx_refcount++; 79 assert(bmi_mx->bmx_refcount == 1); 80 gen_mutex_unlock(&bmi_mx->bmx_completion_lock); 81 return; 82 } 50 83 51 84 /**** TX/RX handling functions **************************************/ … … 224 257 } 225 258 226 /* remove from peer's queued txs/rxs list */227 static void228 bmx_deq_ctx(struct bmx_ctx *ctx)229 {230 struct bmx_peer *peer = ctx->mxc_peer;231 232 BMX_ENTER;233 if (!qlist_empty(&ctx->mxc_list)) {234 gen_mutex_lock(&peer->mxp_lock);235 qlist_del_init(&ctx->mxc_list);236 gen_mutex_unlock(&peer->mxp_lock);237 }238 BMX_EXIT;239 return;240 }241 242 259 /* add to peer's pending rxs list */ 243 260 static void … … 280 297 } 281 298 282 /* queue on unexpected rx or tx list */299 /* dequeue from unexpected rx list */ 283 300 static void 284 bmx_q_unex_ctx(struct bmx_ctx *ctx) 285 { 301 bmx_deq_unex_rx(struct bmx_ctx **rxp) 302 { 303 struct bmx_ctx *rx = NULL; 304 list_t *list = &bmi_mx->bmx_unex_rxs; 305 286 306 BMX_ENTER; 287 if (ctx->mxc_type == BMX_REQ_RX) { 288 gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock); 289 qlist_add_tail(&ctx->mxc_list, &bmi_mx->bmx_unex_rxs); 290 gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock); 291 } else { 292 gen_mutex_lock(&bmi_mx->bmx_unex_txs_lock); 293 qlist_add_tail(&ctx->mxc_list, &bmi_mx->bmx_unex_txs); 294 gen_mutex_unlock(&bmi_mx->bmx_unex_txs_lock); 295 } 307 gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock); 308 if (!qlist_empty(list)) { 309 rx = qlist_entry(list->next, struct bmx_ctx, mxc_list); 310 qlist_del_init(&rx->mxc_list); 311 } 312 gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock); 313 *rxp = rx; 314 296 315 BMX_EXIT; 297 316 return; 298 317 } 299 318 300 /* add to the global canceled list */319 /* add to the completion queue for the appropriate context */ 301 320 static void 302 bmx_q_canceled_ctx(struct bmx_ctx *ctx, bmi_error_code_t error) 303 { 321 bmx_q_completed(struct bmx_ctx *ctx, enum bmx_ctx_state state, 322 mx_status_t status, bmi_error_code_t error) 323 { 324 int id = 0; 325 gen_mutex_t *lock = NULL; 326 list_t *list = NULL; 327 304 328 BMX_ENTER; 305 ctx->mxc_state = BMX_CTX_CANCELED; 306 if (error < 0) 307 ctx->mxc_mxstat.code = error; 308 else 309 ctx->mxc_mxstat.code = -error; 310 gen_mutex_lock(&bmi_mx->bmx_canceled_lock); 311 qlist_add_tail(&ctx->mxc_list, &bmi_mx->bmx_canceled); 312 gen_mutex_unlock(&bmi_mx->bmx_canceled_lock); 329 330 ctx->mxc_state = state; 331 ctx->mxc_mxstat = status; 332 ctx->mxc_error = error < 0 ? error : -error; 333 334 if (ctx->mxc_type == BMX_REQ_RX && 335 ctx->mxc_msg_type == BMX_MSG_UNEXPECTED) { 336 list = &bmi_mx->bmx_unex_rxs; 337 lock = &bmi_mx->bmx_unex_rxs_lock; 338 } else { 339 id = (int) ctx->mxc_mop->context_id; 340 lock = &bmi_mx->bmx_done_q_lock[id]; 341 list = &bmi_mx->bmx_done_q[id]; 342 } 343 344 345 gen_mutex_lock(lock); 346 qlist_add_tail(&ctx->mxc_list, list); 347 gen_mutex_unlock(lock); 348 BMX_EXIT; 349 return; 350 } 351 352 static void 353 bmx_deq_completed(struct bmx_ctx **ctxp, bmi_context_id context_id) 354 { 355 int id = (int) context_id; 356 list_t *list = &bmi_mx->bmx_done_q[id]; 357 gen_mutex_t *lock = &bmi_mx->bmx_done_q_lock[id]; 358 struct bmx_ctx *ctx = NULL; 359 360 BMX_ENTER; 361 362 gen_mutex_lock(lock); 363 if (!qlist_empty(list)) { 364 ctx = qlist_entry(list->next, struct bmx_ctx, mxc_list); 365 qlist_del_init(&ctx->mxc_list); 366 } 367 gen_mutex_unlock(lock); 368 *ctxp = ctx; 369 313 370 BMX_EXIT; 314 371 return; … … 347 404 348 405 static void 349 bmx_put_idle_rx(struct bmx_ctx *rx) 350 { 351 if (rx == NULL) { 352 debug(BMX_DB_WARN, "put_idle_rx() called with NULL"); 406 bmx_put_idle_ctx(struct bmx_ctx *ctx) 407 { 408 list_t *list = &bmi_mx->bmx_idle_txs; 409 gen_mutex_t *lock = &bmi_mx->bmx_idle_txs_lock; 410 411 if (ctx == NULL) { 412 debug(BMX_DB_WARN, "put_idle_ctx() called with NULL"); 353 413 return; 354 414 } 355 if (rx->mxc_type != BMX_REQ_RX) { 356 debug(BMX_DB_WARN, "put_idle_rx() called with a TX"); 357 return; 358 } 359 if (rx->mxc_get != rx->mxc_put + 1) { 360 debug(BMX_DB_ERR, "put_idle_rx() get (%llu) != put (%llu) + 1", 361 (unsigned long long) rx->mxc_get, 362 (unsigned long long) rx->mxc_put); 415 ctx->mxc_put++; 416 if (ctx->mxc_get != ctx->mxc_put) { 417 debug(BMX_DB_ERR, "put_idle_ctx() get (%llu) != put (%llu)", 418 (unsigned long long) ctx->mxc_get, 419 (unsigned long long) ctx->mxc_put); 363 420 exit(1); 364 421 } 365 bmx_ctx_init(rx); 366 rx->mxc_put++; 367 gen_mutex_lock(&bmi_mx->bmx_idle_rxs_lock); 368 qlist_add(&rx->mxc_list, &bmi_mx->bmx_idle_rxs); 369 gen_mutex_unlock(&bmi_mx->bmx_idle_rxs_lock); 422 bmx_ctx_init(ctx); 423 424 if (ctx->mxc_type == BMX_REQ_RX) { 425 list = &bmi_mx->bmx_idle_rxs; 426 lock = &bmi_mx->bmx_idle_rxs_lock; 427 } 428 429 gen_mutex_lock(lock); 430 qlist_add(&ctx->mxc_list, list); 431 gen_mutex_unlock(lock); 370 432 return; 371 433 } … … 416 478 417 479 return tx; 418 }419 420 static void421 bmx_put_idle_tx(struct bmx_ctx *tx)422 {423 if (tx == NULL) {424 debug(BMX_DB_WARN, "put_idle_tx() called with NULL");425 return;426 }427 if (tx->mxc_type != BMX_REQ_TX) {428 debug(BMX_DB_WARN, "put_idle_tx() called with a TX");429 return;430 }431 if (tx->mxc_get != tx->mxc_put + 1) {432 debug(BMX_DB_ERR, "put_idle_tx() get (%llu) != put (%llu) + 1",433 (unsigned long long) tx->mxc_get,434 (unsigned long long) tx->mxc_put);435 exit(1);436 }437 bmx_ctx_init(tx);438 tx->mxc_put++;439 gen_mutex_lock(&bmi_mx->bmx_idle_txs_lock);440 qlist_add(&tx->mxc_list, &bmi_mx->bmx_idle_txs);441 gen_mutex_unlock(&bmi_mx->bmx_idle_txs_lock);442 return;443 480 } 444 481 … … 759 796 return ret; 760 797 } 761 bmx_put_idle_ rx(rx);798 bmx_put_idle_ctx(rx); 762 799 } 763 800 … … 810 847 bmx_globals_init(int method_id) 811 848 { 849 int i = 0; 850 812 851 #if BMX_MEM_ACCT 813 852 mem_used = 0; … … 840 879 gen_mutex_init(&bmi_mx->bmx_idle_rxs_lock); 841 880 842 INIT_QLIST_HEAD(&bmi_mx->bmx_canceled); 843 gen_mutex_init(&bmi_mx->bmx_canceled_lock); 844 845 INIT_QLIST_HEAD(&bmi_mx->bmx_unex_txs); 846 gen_mutex_init(&bmi_mx->bmx_unex_txs_lock); 881 gen_mutex_init(&bmi_mx->bmx_completion_lock); 882 /* set to 1 to allow testing to start */ 883 bmi_mx->bmx_refcount = 1; 884 885 for (i = 0; i < BMI_MAX_CONTEXTS; i++) { 886 INIT_QLIST_HEAD(&bmi_mx->bmx_done_q[i]); 887 gen_mutex_init(&bmi_mx->bmx_done_q_lock[i]); 888 } 889 847 890 INIT_QLIST_HEAD(&bmi_mx->bmx_unex_rxs); 848 891 gen_mutex_init(&bmi_mx->bmx_unex_rxs_lock); … … 883 926 param.val.context_id.shift = BMX_MSG_SHIFT; 884 927 885 if (board == -1) board = 0;886 928 mxret = mx_open_endpoint(board, ep_id, BMX_MAGIC, 887 929 ¶m, 1, ep); … … 922 964 923 965 BMX_ENTER; 924 925 #if BMX_LOGGING926 MPE_Init_log();927 #define BMX_LOG_STATE 1928 #if BMX_LOG_STATE929 send_start = MPE_Log_get_event_number();930 send_finish = MPE_Log_get_event_number();931 recv_start = MPE_Log_get_event_number();932 recv_finish = MPE_Log_get_event_number();933 sendunex_start = MPE_Log_get_event_number();934 sendunex_finish = MPE_Log_get_event_number();935 recvunex_start = MPE_Log_get_event_number();936 recvunex_finish = MPE_Log_get_event_number();937 MPE_Describe_state(send_start, send_finish, "Send", "red");938 MPE_Describe_state(recv_start, recv_finish, "Recv", "blue");939 MPE_Describe_state(sendunex_start, sendunex_finish, "SendUnex", "orange");940 MPE_Describe_state(recvunex_start, recvunex_finish, "RecvUnex", "green");941 #else942 MPE_Log_get_solo_eventID(&send_start);943 MPE_Log_get_solo_eventID(&send_finish);944 MPE_Log_get_solo_eventID(&recv_start);945 MPE_Log_get_solo_eventID(&recv_finish);946 MPE_Log_get_solo_eventID(&sendunex_start);947 MPE_Log_get_solo_eventID(&sendunex_finish);948 MPE_Log_get_solo_eventID(&recvunex_start);949 MPE_Log_get_solo_eventID(&recvunex_finish);950 MPE_Describe_info_event(send_start, "Send_start", "red1", "tag:%d");951 MPE_Describe_info_event(send_finish, "Send_finish", "red3", "tag:%d");952 MPE_Describe_info_event(recv_start, "Recv_start", "blue1", "tag:%d");953 MPE_Describe_info_event(recv_finish, "Recv_finish", "blue3", "tag:%d");954 MPE_Describe_info_event(sendunex_start, "SendUnex_start", "orange1", "tag:%d");955 MPE_Describe_info_event(sendunex_finish, "SendUnex_finish", "orange3", "tag:%d");956 MPE_Describe_info_event(recvunex_start, "RecvUnex_start", "green1", "tag:%d");957 MPE_Describe_info_event(recvunex_finish, "RecvUnex_finish", "green3", "tag:%d");958 #endif /* state or event */959 #endif /* BMX_LOGGING */960 966 961 967 /* check params */ … … 1024 1030 ret = bmx_ctx_alloc(&rx, BMX_REQ_RX); 1025 1031 if (ret == 0) { 1026 bmx_put_idle_ rx(rx);1032 bmx_put_idle_ctx(rx); 1027 1033 } 1028 1034 } … … 1137 1143 debug(BMX_DB_MEM, "memory leaked at shutdown %lld", llu(mem_used)); 1138 1144 #endif 1139 1140 #if BMX_LOGGING1141 MPE_Finish_log("/tmp/bmi_mx.log");1142 #endif1143 1145 BMX_EXIT; 1144 1145 1146 return 0; 1146 1147 } … … 1180 1181 tx = qlist_entry(queued_txs->next, struct bmx_ctx, mxc_list); 1181 1182 qlist_del_init(&tx->mxc_list); 1182 bmx_q_c anceled_ctx(tx, err);1183 bmx_q_completed(tx, BMX_CTX_CANCELED, BMX_NO_STATUS, err); 1183 1184 } 1184 1185 /* cancel queued rxs */ … … 1187 1188 rx = qlist_entry(queued_rxs->next, struct bmx_ctx, mxc_list); 1188 1189 qlist_del_init(&rx->mxc_list); 1189 bmx_q_c anceled_ctx(rx, err);1190 bmx_q_completed(rx, BMX_CTX_CANCELED, BMX_NO_STATUS, err); 1190 1191 } 1191 1192 /* try to cancel pending rxs */ … … 1195 1196 if (result) { 1196 1197 qlist_del_init(&rx->mxc_list); 1197 bmx_q_c anceled_ctx(rx, err);1198 bmx_q_completed(rx, BMX_CTX_CANCELED, BMX_NO_STATUS, err); 1198 1199 } 1199 1200 } … … 1424 1425 uint64_t type = (uint64_t) ctx->mxc_msg_type; 1425 1426 uint64_t id = 0ULL; 1426 uint64_t tag = (uint64_t) ctx->mxc_tag;1427 uint64_t tag = (uint64_t) ((uint32_t) ctx->mxc_tag); 1427 1428 1428 1429 if (ctx->mxc_msg_type == BMX_MSG_CONN_REQ || … … 1517 1518 ret = -BMI_ENOMEM; 1518 1519 bmx_deq_pending_ctx(tx); /* uses peer lock */ 1519 bmx_q_c anceled_ctx(tx, BMI_ENOMEM);1520 bmx_q_completed(tx, BMX_CTX_CANCELED, BMX_NO_STATUS, BMI_ENOMEM); 1520 1521 } 1521 1522 } else { /* peer is not ready */ … … 1572 1573 PINT_event_id eid = 0; 1573 1574 1574 #if BMX_LOGGING1575 if (!is_unexpected) {1576 MPE_Log_event(send_start, (int) tag, NULL);1577 } else {1578 MPE_Log_event(sendunex_start, (int) tag, NULL);1579 }1580 #endif1581 1582 1575 PINT_EVENT_START( 1583 1576 bmi_mx_send_event_id, bmi_mx_pid, NULL, &eid, … … 1621 1614 BMX_MALLOC(segs, (numbufs * sizeof(*segs))); 1622 1615 if (segs == NULL) { 1623 bmx_put_idle_ tx(tx);1616 bmx_put_idle_ctx(tx); 1624 1617 bmx_peer_decref(peer); 1625 1618 ret = -BMI_ENOMEM; … … 1642 1635 1643 1636 if (is_unexpected && tx->mxc_nob > (long long) BMX_UNEXPECTED_SIZE) { 1644 bmx_put_idle_ tx(tx);1637 bmx_put_idle_ctx(tx); 1645 1638 bmx_peer_decref(peer); 1646 1639 ret = -BMI_EINVAL; … … 1658 1651 BMX_MALLOC(mop, sizeof(*mop)); 1659 1652 if (mop == NULL) { 1660 bmx_put_idle_ tx(tx);1653 bmx_put_idle_ctx(tx); 1661 1654 bmx_peer_decref(peer); 1662 1655 ret = -BMI_ENOMEM; … … 1673 1666 tx->mxc_mop = mop; 1674 1667 1668 assert(context_id == mop->context_id); 1669 assert(context_id == tx->mxc_mop->context_id); 1670 1675 1671 bmx_create_match(tx); 1676 1672 1677 debug(BMX_DB_CTX, "%s tag= %d length= %d %s op_id= %llu", __func__, tag, 1678 (int) total_size, is_unexpected ? "UNEXPECTED" : "EXPECTED", 1679 llu(mop->op_id)); 1673 debug(BMX_DB_CTX, "%s tag= %d length= %d %s op_id= %llu context_id= %lld", 1674 __func__, tag, (int) total_size, 1675 is_unexpected ? "UNEXPECTED" : "EXPECTED", 1676 llu(mop->op_id), lld(context_id)); 1680 1677 1681 1678 ret = bmx_post_tx(tx); … … 1792 1789 ret = -BMI_ENOMEM; 1793 1790 bmx_deq_pending_ctx(rx); /* uses peer lock */ 1794 bmx_q_c anceled_ctx(rx, BMI_ENOMEM);1791 bmx_q_completed(rx, BMX_CTX_CANCELED, BMX_NO_STATUS, BMI_ENOMEM); 1795 1792 } 1796 1793 } else { /* peer is not ready */ … … 1817 1814 struct bmx_peer *peer = NULL; 1818 1815 PINT_event_id eid = 0; 1819 1820 #if BMX_LOGGING1821 MPE_Log_event(recv_start, (int) tag, NULL);1822 #endif1823 1816 1824 1817 PINT_EVENT_START( … … 1865 1858 BMX_MALLOC(segs, (numbufs * sizeof(*segs))); 1866 1859 if (segs == NULL) { 1867 bmx_put_idle_ rx(rx);1860 bmx_put_idle_ctx(rx); 1868 1861 bmx_peer_decref(peer); 1869 1862 ret = -BMI_ENOMEM; … … 1887 1880 BMX_MALLOC(mop, sizeof(*mop)); 1888 1881 if (mop == NULL) { 1889 bmx_put_idle_ rx(rx);1882 bmx_put_idle_ctx(rx); 1890 1883 bmx_peer_decref(peer); 1891 1884 ret = -BMI_ENOMEM; … … 2026 2019 } 2027 2020 2028 #if BMX_LOGGING2029 MPE_Log_event(recvunex_start, (int) tag, NULL);2030 #endif2031 2032 2021 rx = bmx_get_idle_rx(); 2033 2022 if (rx != NULL) { … … 2062 2051 "unexpected recv with tag %d length %d", 2063 2052 mx_strerror(mxret), tag, length); 2064 bmx_put_idle_ rx(rx);2053 bmx_put_idle_ctx(rx); 2065 2054 ret = -1; 2066 2055 } … … 2121 2110 "unexpected recv with %s", 2122 2111 mx_strerror(mxret)); 2123 bmx_put_idle_ rx(rx);2112 bmx_put_idle_ctx(rx); 2124 2113 ret = MX_RECV_FINISHED; 2125 2114 } … … 2341 2330 /* drop ref taken before mx_iconnect() */ 2342 2331 bmx_peer_decref(tx->mxc_peer); 2343 bmx_put_idle_ tx(tx);2332 bmx_put_idle_ctx(tx); 2344 2333 continue; 2345 2334 } else if (status.code != MX_STATUS_SUCCESS) { 2346 2335 bmx_peer_decref(rx->mxc_peer); 2347 bmx_put_idle_ rx(rx);2336 bmx_put_idle_ctx(rx); 2348 2337 continue; 2349 2338 } … … 2356 2345 BMX_VERSION, version); 2357 2346 bmx_peer_decref(rx->mxc_peer); 2358 bmx_put_idle_ rx(rx);2347 bmx_put_idle_ctx(rx); 2359 2348 continue; 2360 2349 } … … 2362 2351 debug(BMX_DB_WARN, "received CONN_REQ on a client."); 2363 2352 bmx_peer_decref(rx->mxc_peer); 2364 bmx_put_idle_ rx(rx);2353 bmx_put_idle_ctx(rx); 2365 2354 continue; 2366 2355 } … … 2390 2379 (char *) rx->mxc_buffer); 2391 2380 bmx_peer_decref(rx->mxc_peer); 2392 bmx_put_idle_ rx(rx);2381 bmx_put_idle_ctx(rx); 2393 2382 continue; 2394 2383 } … … 2399 2388 "method addr for %s", peername); 2400 2389 bmx_peer_decref(rx->mxc_peer); 2401 bmx_put_idle_ rx(rx);2390 bmx_put_idle_ctx(rx); 2402 2391 continue; 2403 2392 } … … 2409 2398 "peer for %s", peername); 2410 2399 bmx_peer_decref(rx->mxc_peer); 2411 bmx_put_idle_ rx(rx);2400 bmx_put_idle_ctx(rx); 2412 2401 continue; 2413 2402 } … … 2436 2425 mx_iconnect(bmi_mx->bmx_ep, peer->mxp_nic_id, mxmap->mxm_ep_id, 2437 2426 BMX_MAGIC, ack, peer, &request); 2438 bmx_put_idle_ rx(rx);2427 bmx_put_idle_ctx(rx); 2439 2428 } 2440 2429 } while (result); … … 2551 2540 llu(tx->mxc_match), mx_strstatus(status.code)); 2552 2541 bmx_peer_decref(tx->mxc_peer); 2553 bmx_put_idle_ tx(tx);2542 bmx_put_idle_ctx(tx); 2554 2543 } 2555 2544 } while (result); … … 2578 2567 } 2579 2568 2569 static void 2570 bmx_complete_ctx(struct bmx_ctx *ctx, bmi_op_id_t *outid, bmi_error_code_t *err, 2571 bmi_size_t *size, void **user_ptr) 2572 { 2573 struct bmx_peer *peer = ctx->mxc_peer; 2574 2575 *outid = ctx->mxc_mop->op_id; 2576 *err = ctx->mxc_error; 2577 *size = ctx->mxc_mxstat.xfer_length; 2578 if (user_ptr) 2579 *user_ptr = ctx->mxc_mop->user_ptr; 2580 PINT_EVENT_END( 2581 (ctx->mxc_type == BMX_REQ_TX ? 2582 bmi_mx_send_event_id : bmi_mx_recv_event_id), 2583 bmi_mx_pid, NULL, ctx->mxc_mop->event_id, 2584 *outid, *size); 2585 2586 id_gen_fast_unregister(ctx->mxc_mop->op_id); 2587 BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop)); 2588 bmx_put_idle_ctx(ctx); 2589 bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */ 2590 2591 return; 2592 } 2593 2580 2594 static int 2581 2595 BMI_mx_test(bmi_op_id_t id, int *outcount, bmi_error_code_t *err, 2582 2596 bmi_size_t *size, void **user_ptr, int max_idle_time __unused, 2583 bmi_context_id context_id __unused)2597 bmi_context_id context_id) 2584 2598 { 2585 2599 uint32_t result = 0; … … 2592 2606 bmx_connection_handlers(); 2593 2607 2608 bmx_get_completion_token(); 2609 2594 2610 mop = id_gen_fast_lookup(id); 2595 2611 ctx = mop->method_data; 2596 2612 peer = ctx->mxc_peer; 2597 2613 2614 assert(context_id == mop->context_id); 2615 if (ctx->mxc_type == BMX_REQ_RX) 2616 assert(ctx->mxc_msg_type != BMX_MSG_UNEXPECTED); 2617 assert(context_id == ctx->mxc_mop->context_id); 2618 2598 2619 switch (ctx->mxc_state) { 2620 case BMX_CTX_COMPLETED: 2599 2621 case BMX_CTX_CANCELED: 2600 /* we are racing with testcontext */ 2601 gen_mutex_lock(&bmi_mx->bmx_canceled_lock); 2602 if (ctx->mxc_state != BMX_CTX_CANCELED) { 2603 gen_mutex_unlock(&bmi_mx->bmx_canceled_lock); 2604 return 0; 2605 } 2622 gen_mutex_lock(&bmi_mx->bmx_done_q_lock[(int) context_id]); 2606 2623 qlist_del_init(&ctx->mxc_list); 2607 gen_mutex_unlock(&bmi_mx->bmx_canceled_lock); 2624 gen_mutex_unlock(&bmi_mx->bmx_done_q_lock[(int) context_id]); 2625 bmx_complete_ctx(ctx, &id, err, size, user_ptr); 2608 2626 *outcount = 1; 2609 *err = ctx->mxc_mxstat.code;2610 if (ctx->mxc_mop) {2611 if (user_ptr) {2612 *user_ptr = ctx->mxc_mop->user_ptr;2613 }2614 id_gen_fast_unregister(ctx->mxc_mop->op_id);2615 BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop));2616 }2617 bmx_peer_decref(peer);2618 if (ctx->mxc_type == BMX_REQ_TX) {2619 bmx_put_idle_tx(ctx);2620 } else {2621 bmx_put_idle_rx(ctx);2622 }2623 2627 break; 2624 2628 case BMX_CTX_PENDING: 2625 /* racing with mx_test_any() in textcontext? */2626 2629 mx_test(bmi_mx->bmx_ep, &ctx->mxc_mxreq, &ctx->mxc_mxstat, &result); 2627 2630 if (result) { 2628 PINT_EVENT_END( 2629 (ctx->mxc_type == BMX_REQ_TX ? 2630 bmi_mx_send_event_id : bmi_mx_recv_event_id), 2631 bmi_mx_pid, NULL, ctx->mxc_mop->event_id, 2632 ctx->mxc_mop->op_id, *size); 2633 2631 bmx_deq_pending_ctx(ctx); 2632 bmx_complete_ctx(ctx, &id, err, size, user_ptr); 2634 2633 *outcount = 1; 2635 if (ctx->mxc_mxstat.code == MX_STATUS_SUCCESS) {2636 *err = 0;2637 *size = ctx->mxc_mxstat.xfer_length;2638 } else {2639 *err = bmx_mx_to_bmi_errno(ctx->mxc_mxstat.code);2640 }2641 if (ctx->mxc_mop) {2642 if (user_ptr) {2643 *user_ptr = ctx->mxc_mop->user_ptr;2644 }2645 id_gen_fast_unregister(ctx->mxc_mop->op_id);2646 BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop));2647 }2648 bmx_deq_pending_ctx(ctx);2649 if (ctx->mxc_type == BMX_REQ_TX) {2650 bmx_put_idle_tx(ctx);2651 } else {2652 bmx_put_idle_rx(ctx);2653 }2654 bmx_peer_decref(peer);2655 2634 } 2656 2635 break; … … 2659 2638 ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", ctx->mxc_state); 2660 2639 } 2640 bmx_release_completion_token(); 2661 2641 BMX_EXIT; 2662 2642 … … 2677 2657 struct bmx_ctx *ctx = NULL; 2678 2658 struct bmx_peer *peer = NULL; 2679 list_t *canceled = &bmi_mx->bmx_canceled;2680 2659 int wait = 0; 2681 2660 static int count = 0; … … 2689 2668 bmx_connection_handlers(); 2690 2669 2691 /* always return canceled messages first */ 2692 while (completed < incount && !qlist_empty(canceled)) { 2693 gen_mutex_lock(&bmi_mx->bmx_canceled_lock); 2694 ctx = qlist_entry(canceled->next, struct bmx_ctx, mxc_list); 2695 qlist_del_init(&ctx->mxc_list); 2696 /* change state in case test is trying to reap it as well */ 2697 ctx->mxc_state = BMX_CTX_COMPLETED; 2698 gen_mutex_unlock(&bmi_mx->bmx_canceled_lock); 2699 peer = ctx->mxc_peer; 2700 outids[completed] = ctx->mxc_mop->op_id; 2701 errs[completed] = ctx->mxc_mxstat.code; 2702 if (user_ptrs) 2703 user_ptrs[completed] = ctx->mxc_mop->user_ptr; 2704 id_gen_fast_unregister(ctx->mxc_mop->op_id); 2705 BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop)); 2706 completed++; 2707 if (ctx->mxc_type == BMX_REQ_TX) { 2708 bmx_put_idle_tx(ctx); 2709 } else { 2710 bmx_put_idle_rx(ctx); 2711 } 2712 bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */ 2713 if (completed > 0) { 2714 debug(BMX_DB_CTX, "%s found %d canceled messages", 2715 __func__, completed); 2716 } 2717 } 2718 2719 /* return completed messages 2670 bmx_get_completion_token(); 2671 2672 /* always return queued, completed messages first */ 2673 do { 2674 bmx_deq_completed(&ctx, context_id); 2675 if (ctx) { 2676 bmx_complete_ctx(ctx, &outids[completed], &errs[completed], 2677 &sizes[completed], &user_ptrs[completed]); 2678 completed++; 2679 } 2680 } while (completed < incount && ctx != NULL); 2681 2682 if (completed > 0) 2683 debug(BMX_DB_CTX, "%s found %d completed messages", __func__, completed); 2684 2685 /* try to complete expected messages 2720 2686 * we will always try (incount - completed) times even 2721 2687 * if some iterations have no result */ … … 2736 2702 wait = 2; 2737 2703 } 2704 2738 2705 if (result) { 2739 2706 ctx = (struct bmx_ctx *) status.context; 2740 2707 bmx_deq_pending_ctx(ctx); 2708 if (ctx->mxc_mop->context_id != context_id) { 2709 bmx_q_completed(ctx, BMX_CTX_COMPLETED, status, 2710 bmx_mx_to_bmi_errno(status.code)); 2711 continue; 2712 } 2713 ctx->mxc_mxstat = status; 2741 2714 peer = ctx->mxc_peer; 2742 2715 debug(BMX_DB_CTX, "%s completing expected %s with match 0x%llx " 2743 "for %s with op_id %llu length %d %s", __func__, 2744 ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", 2716 "for %s with op_id %llu length %d %s " 2717 "context_id= %d mop->context_id= %d", 2718 __func__, ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", 2745 2719 llu(ctx->mxc_match), peer->mxp_mxmap->mxm_peername, 2746 2720 llu(ctx->mxc_mop->op_id), status.xfer_length, 2747 mx_strstatus(status.code)); 2748 2749 outids[completed] = ctx->mxc_mop->op_id; 2750 if (status.code == MX_SUCCESS) { 2751 errs[completed] = 0; 2752 sizes[completed] = status.xfer_length; 2753 } else { 2754 errs[completed] = bmx_mx_to_bmi_errno(status.code); 2755 } 2756 if (user_ptrs) 2757 user_ptrs[completed] = ctx->mxc_mop->user_ptr; 2758 2759 PINT_EVENT_END( 2760 (ctx->mxc_type == BMX_REQ_TX ? 2761 bmi_mx_send_event_id : bmi_mx_recv_event_id), 2762 bmi_mx_pid, NULL, ctx->mxc_mop->event_id, 2763 ctx->mxc_mop->op_id, status.xfer_length); 2764 2765 id_gen_fast_unregister(ctx->mxc_mop->op_id); 2766 BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop)); 2721 mx_strstatus(status.code), (int) context_id, 2722 (int) ctx->mxc_mop->context_id); 2723 2724 bmx_complete_ctx(ctx, &outids[completed], &errs[completed], 2725 &sizes[completed], &user_ptrs[completed]); 2767 2726 completed++; 2768 #if BMX_LOGGING 2769 if (ctx->mxc_type == BMX_REQ_TX) { 2770 MPE_Log_event(send_finish, (int) ctx->mxc_tag, NULL); 2771 } else { 2772 MPE_Log_event(recv_finish, (int) ctx->mxc_tag, NULL); 2773 } 2774 #endif 2775 if (ctx->mxc_type == BMX_REQ_TX) { 2776 bmx_put_idle_tx(ctx); 2777 } else { 2778 bmx_put_idle_rx(ctx); 2779 } 2780 bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */ 2781 } 2782 if (completed - old > 0) { 2783 debug(BMX_DB_CTX, "%s found %d expected messages", 2784 __func__, completed - old); 2785 } 2786 } 2787 2788 /* check for completed unexpected sends */ 2727 } 2728 } 2729 2730 if (completed - old > 0) 2731 debug(BMX_DB_CTX, "%s found %d expected messages", __func__, completed - old); 2732 2733 /* try to complete unexpected sends */ 2789 2734 2790 2735 match = (uint64_t) BMX_MSG_UNEXPECTED << BMX_MSG_SHIFT; … … 2795 2740 uint32_t result = 0; 2796 2741 mx_status_t status; 2797 list_t *unex_txs = &bmi_mx->bmx_unex_txs;2798 2742 int again = 1; 2799 2743 2800 2744 ctx = NULL; 2801 2802 gen_mutex_lock(&bmi_mx->bmx_unex_txs_lock);2803 if (!qlist_empty(unex_txs)) {2804 ctx = qlist_entry(unex_txs->next, struct bmx_ctx, mxc_list);2805 peer = ctx->mxc_peer;2806 qlist_del_init(&ctx->mxc_list);2807 result = 1;2808 }2809 gen_mutex_unlock(&bmi_mx->bmx_unex_txs_lock);2810 2745 2811 2746 while (!ctx && again) { … … 2816 2751 bmx_deq_pending_ctx(ctx); 2817 2752 peer = ctx->mxc_peer; 2818 if (ctx->mxc_type == BMX_REQ_RX) { 2819 /* queue until testunexpected is called */ 2820 bmx_q_unex_ctx(ctx); 2753 if (ctx->mxc_type == BMX_REQ_RX || 2754 ctx->mxc_mop->context_id != context_id) { 2755 /* queue until testunexpected or queue 2756 * until testcontext for the correct context */ 2757 bmx_q_completed(ctx, BMX_CTX_COMPLETED, status, 2758 bmx_mx_to_bmi_errno(status.code)); 2821 2759 result = 0; 2822 2760 again = 1; … … 2834 2772 llu(ctx->mxc_mop->op_id)); 2835 2773 2836 outids[completed] = ctx->mxc_mop->op_id; 2837 if (status.code == MX_SUCCESS) { 2838 errs[completed] = 0; 2839 sizes[completed] = status.xfer_length; 2840 } else { 2841 errs[completed] = bmx_mx_to_bmi_errno(status.code); 2774 ctx->mxc_mxstat = status; 2775 bmx_complete_ctx(ctx, &outids[completed], &errs[completed], 2776 &sizes[completed], &user_ptrs[completed]); 2777 2778 if (status.code != MX_SUCCESS) { 2842 2779 debug(BMX_DB_CTX, "%s unexpected send completed with " 2843 2780 "error %s", __func__, mx_strstatus(status.code)); 2844 2781 bmx_peer_disconnect(peer, 0, BMI_ENETRESET); 2845 2782 } 2846 if (user_ptrs)2847 user_ptrs[completed] = ctx->mxc_mop->user_ptr;2848 PINT_EVENT_END(2849 (ctx->mxc_type == BMX_REQ_TX ?2850 bmi_mx_send_event_id : bmi_mx_recv_event_id),2851 bmi_mx_pid, NULL, ctx->mxc_mop->event_id,2852 ctx->mxc_mop->op_id, status.xfer_length);2853 2854 id_gen_fast_unregister(ctx->mxc_mop->op_id);2855 BMX_FREE(ctx->mxc_mop, sizeof(*ctx->mxc_mop));2856 2783 completed++; 2857 #if BMX_LOGGING 2858 MPE_Log_event(sendunex_finish, (int) ctx->mxc_tag, NULL); 2859 #endif 2860 2861 bmx_put_idle_tx(ctx); 2862 bmx_peer_decref(peer); /* drop the ref taken in [send|recv]_common */ 2863 } 2864 } 2784 } 2785 } 2786 bmx_release_completion_token(); 2787 2865 2788 if (completed - old > 0) { 2866 2789 debug(BMX_DB_CTX, "%s found %d unexpected tx messages", … … 2888 2811 struct bmx_ctx *rx = NULL; 2889 2812 struct bmx_peer *peer = NULL; 2890 list_t *unex_rxs = &bmi_mx->bmx_unex_rxs;2891 2813 int again = 1; 2892 2814 … … 2897 2819 2898 2820 bmx_connection_handlers(); 2821 2822 bmx_get_completion_token(); 2899 2823 2900 2824 /* if the unexpected handler cannot get a rx, it does not post a receive. … … 2903 2827 if (result) { 2904 2828 int ret = 0; 2905 ret = bmx_post_unexpected_recv(status.source, 0, 0, 0, status.match_info, status.xfer_length); 2829 ret = bmx_post_unexpected_recv(status.source, 0, 0, 0, 2830 status.match_info, 2831 status.xfer_length); 2906 2832 if (ret != 0) { 2907 2833 debug(BMX_DB_CTX, "%s mx_iprobe() found rx with match 0x%llx " … … 2914 2840 *outcount = 0; 2915 2841 2916 gen_mutex_lock(&bmi_mx->bmx_unex_rxs_lock); 2917 if (!qlist_empty(unex_rxs)) { 2918 rx = qlist_entry(unex_rxs->next, struct bmx_ctx, mxc_list); 2842 bmx_deq_unex_rx(&rx); 2843 if (rx) { 2844 result = 1; 2845 status = rx->mxc_mxstat; 2919 2846 peer = rx->mxc_peer; 2920 qlist_del_init(&rx->mxc_list); 2921 result = 1; 2922 } 2923 gen_mutex_unlock(&bmi_mx->bmx_unex_rxs_lock); 2847 } 2924 2848 2925 2849 while (!rx && again) { … … 2931 2855 peer = rx->mxc_peer; 2932 2856 if (rx->mxc_type == BMX_REQ_TX) { 2933 bmx_q_unex_ctx(rx); 2857 bmx_q_completed(rx, BMX_CTX_COMPLETED, status, 2858 bmx_mx_to_bmi_errno(status.code)); 2934 2859 result = 0; 2935 2860 again = 1; … … 2953 2878 ui->tag = rx->mxc_tag; 2954 2879 2955 #if BMX_LOGGING 2956 MPE_Log_event(recvunex_finish, (int) rx->mxc_tag, NULL); 2957 #endif 2958 bmx_put_idle_rx(rx); 2880 bmx_put_idle_ctx(rx); 2959 2881 bmx_peer_decref(peer); /* drop the ref taken in unexpected_recv() */ 2960 2882 *outcount = 1; 2961 2883 } 2884 bmx_release_completion_token(); 2885 2962 2886 if (print) 2963 2887 BMX_EXIT; … … 3014 2938 3015 2939 ret = bmx_open_endpoint(&bmi_mx->bmx_ep, 3016 bmi_mx->bmx_board,2940 MX_ANY_NIC, 3017 2941 MX_ANY_ENDPOINT); 3018 2942 if (ret != 0) { … … 3026 2950 mx_decompose_endpoint_addr2(epa, &nic_id, &bmi_mx->bmx_ep_id, 3027 2951 &bmi_mx->bmx_sid); 2952 /* get our board number */ 2953 mx_nic_id_to_board_number(nic_id, &bmi_mx->bmx_board); 3028 2954 /* get our hostname */ 3029 2955 mx_nic_id_to_hostname(nic_id, host); … … 3141 3067 /* NOTE There may be a race between this and BMI_mx_testcontext(). */ 3142 3068 static int 3143 BMI_mx_cancel(bmi_op_id_t id, bmi_context_id context_id __unused)3069 BMI_mx_cancel(bmi_op_id_t id, bmi_context_id context_id) 3144 3070 { 3145 3071 struct method_op *mop; … … 3150 3076 BMX_ENTER; 3151 3077 3078 bmx_get_completion_token(); 3079 3152 3080 mop = id_gen_fast_lookup(id); 3153 3081 ctx = mop->method_data; 3154 3082 peer = ctx->mxc_peer; 3155 3083 3084 assert(context_id == ctx->mxc_mop->context_id); 3085 3156 3086 debug(BMX_DB_CTX, "%s %s op_id %llu mxc_state %d peer state %d", __func__, 3157 3087 ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", 3158 3088 llu(ctx->mxc_mop->op_id), ctx->mxc_state, peer->mxp_state); 3089 3090 /* avoid race with connection setup */ 3091 gen_mutex_lock(&peer->mxp_lock); 3092 3159 3093 switch (ctx->mxc_state) { 3160 3094 case BMX_CTX_QUEUED: 3161 /* we are racing with the connection setup */3162 bmx_deq_ctx(ctx);3163 bmx_q_c anceled_ctx(ctx,BMI_ECANCEL);3095 qlist_del_init(&ctx->mxc_list); 3096 gen_mutex_unlock(&peer->mxp_lock); 3097 bmx_q_completed(ctx, BMX_CTX_CANCELED, BMX_NO_STATUS, BMI_ECANCEL); 3164 3098 break; 3165 3099 case BMX_CTX_PENDING: 3100 gen_mutex_unlock(&peer->mxp_lock); 3166 3101 if (ctx->mxc_type == BMX_REQ_TX) { 3167 3102 /* see if it completed first */ … … 3170 3105 debug(BMX_DB_CTX, "%s completed TX op_id %llu " 3171 3106 "mxc_state %d peer state %d status.code %s", 3172 __func__, llu(ctx->mxc_mop->op_id), ctx->mxc_state, 3107 __func__, llu(ctx->mxc_mop->op_id), ctx->mxc_state, 3173 3108 peer->mxp_state, mx_strstatus(ctx->mxc_mxstat.code)); 3174 3109 bmx_deq_pending_ctx(ctx); 3175 bmx_q_canceled_ctx(ctx, BMI_ECANCEL); 3110 bmx_q_completed(ctx, BMX_CTX_CANCELED, 3111 ctx->mxc_mxstat, BMI_ECANCEL); 3176 3112 } else { 3177 3113 /* and if not, then disconnect() */ … … 3182 3118 if (result == 1) { 3183 3119 bmx_deq_pending_ctx(ctx); 3184 bmx_q_canceled_ctx(ctx, BMI_ECANCEL); 3120 bmx_q_completed(ctx, BMX_CTX_CANCELED, 3121 BMX_NO_STATUS, BMI_ECANCEL); 3185 3122 } 3186 3123 } … … 3190 3127 ctx->mxc_type == BMX_REQ_TX ? "TX" : "RX", ctx->mxc_state); 3191 3128 } 3129 bmx_release_completion_token(); 3130 3192 3131 BMX_EXIT; 3193 3132 … … 3213 3152 { 3214 3153 .method_name = "bmi_mx", 3154 .flags = 0, 3215 3155 .initialize = BMI_mx_initialize, 3216 3156 .finalize = BMI_mx_finalize, -
branches/cu-security-branch/src/io/bmi/bmi_mx/mx.h
r7024 r8397 16 16 #include <unistd.h> 17 17 #include <errno.h> 18 #include <assert.h> 18 19 19 20 #include <mx_extensions.h> /* needed for callback handler, etc. */ … … 54 55 #define BMX_DEBUG 1 /* enable debug (gossip) statements */ 55 56 #define BMX_MEM_ACCT 0 /* track number of bytes alloc's and freed */ 56 #define BMX_LOGGING 0 /* use MPE logging routines */57 58 #if BMX_LOGGING59 #include "mpe.h"60 #endif61 57 62 58 #if BMX_MEM_TWEAK … … 166 162 gen_mutex_t bmx_idle_rxs_lock; /* idle_rxs lock */ 167 163 168 list_t bmx_canceled; /* canceled reqs waiting for test */ 169 gen_mutex_t bmx_canceled_lock; /* canceled list lock */ 170 171 list_t bmx_unex_txs; /* completed unexpected sends */ 172 gen_mutex_t bmx_unex_txs_lock; /* completed unexpected sends lock */ 164 gen_mutex_t bmx_completion_lock; /* lock for test* functions */ 165 int bmx_refcount; /* try to avoid races between test* 166 and cancel functions */ 167 168 /* completed expected msgs 169 * including unexpected sends */ 170 list_t bmx_done_q[BMI_MAX_CONTEXTS]; 171 gen_mutex_t bmx_done_q_lock[BMI_MAX_CONTEXTS]; 172 173 173 list_t bmx_unex_rxs; /* completed unexpected recvs */ 174 174 gen_mutex_t bmx_unex_rxs_lock; /* completed unexpected recvs lock */ … … 278 278 mx_request_t mxc_mxreq; /* MX request */ 279 279 mx_status_t mxc_mxstat; /* MX status */ 280 bmi_error_code_t mxc_error; /* BMI error code */ 280 281 281 282 uint64_t mxc_get; /* # of times returned from idle list */ -
branches/cu-security-branch/src/io/bmi/bmi_portals
- Property svn:ignore deleted
-
branches/cu-security-branch/src/io/bmi/bmi_portals/portals.c
r7941 r8397 2294 2294 { 2295 2295 .method_name = "bmi_portals", 2296 .flags = 0, 2296 2297 .initialize = bmip_initialize, 2297 2298 .finalize = bmip_finalize, -
branches/cu-security-branch/src/io/bmi/bmi_tcp
- Property svn:ignore deleted
-
branches/cu-security-branch/src/io/bmi/bmi_tcp/bmi-tcp.c
r8330 r8397 3649 3649 (errno == ENONET) || 3650 3650 (errno == EHOSTUNREACH) || 3651 (errno == EOPNOTSUPP) || (errno == ENETUNREACH)) 3651 (errno == EOPNOTSUPP) || 3652 (errno == ENETUNREACH) || 3653 (errno == ENFILE) || 3654 (errno == EMFILE)) 3652 3655 { 3653 3656 /* try again later */ 3657 if ((errno == ENFILE) || (errno == EMFILE)) 3658 { 3659 gossip_err("Error: accept: %s (continuing)\n",strerror(errno)); 3660 bmi_method_addr_drop_callback(BMI_tcp_method_name); 3661 } 3654 3662 return (0); 3655 3663 } -
branches/cu-security-branch/src/io/bmi/bmi_tcp/socket-collection-epoll.c
r8330 r8397 128 128 memset(maps, 0, (sizeof(bmi_method_addr_p) * incount)); 129 129 memset(status, 0, (sizeof(int) * incount)); 130 131 if(incount == 0) 132 { 133 return(0); 134 } 130 135 131 136 /* actually do the epoll_wait() here */ -
branches/cu-security-branch/src/io/bmi/bmi_tcp/socket-collection.c
r7941 r8397 90 90 if(new_server_socket > -1) 91 91 { 92 tmp_scp->pollfd_array[ 0].fd = new_server_socket;93 tmp_scp->pollfd_array[ 0].events = POLLIN;94 tmp_scp->addr_array[ 0] = NULL;92 tmp_scp->pollfd_array[tmp_scp->array_count].fd = new_server_socket; 93 tmp_scp->pollfd_array[tmp_scp->array_count].events = POLLIN; 94 tmp_scp->addr_array[tmp_scp->array_count] = NULL; 95 95 tmp_scp->array_count++; 96 /* Add the pipe_fd[0] fd to the poll in set always */ 97 tmp_scp->pollfd_array[1].fd = tmp_scp->pipe_fd[0]; 98 tmp_scp->pollfd_array[1].events = POLLIN; 99 tmp_scp->addr_array[1] = NULL; 100 tmp_scp->array_count++; 101 } 96 } 97 98 /* Add the pipe_fd[0] fd to the poll in set always */ 99 tmp_scp->pollfd_array[tmp_scp->array_count].fd = tmp_scp->pipe_fd[0]; 100 tmp_scp->pollfd_array[tmp_scp->array_count].events = POLLIN; 101 tmp_scp->addr_array[tmp_scp->array_count] = NULL; 102 tmp_scp->array_count++; 102 103 103 104 return (tmp_scp); -
branches/cu-security-branch/src/io/bmi/bmi_zoid/README
r8313 r8397 17 17 needed, but is probably already larger than necessary, 18 18 19 - ZBMI_SHM_SIZE_TOTAL (in init.c in ZeptoOS 20 packages/zoid/src/zbmi/implementation/ directory): total size of the shared 21 memory buffer used to exchange bulk data between the ZOID daemon and the BMI 22 server; defaults to 512M. 23 24 - ZBMI_SHM_SIZE_UNEXP (in init.c in ZeptoOS 25 packages/zoid/src/zbmi/implementation/ directory): part of the shared 26 memory buffer used for unexpected messages; defaults to 1M. 19 - FIXME ZBMI limits 27 20 28 21 … … 64 57 invoked on the client side, as each such call will result in a 65 58 communication with the I/O node (10ms is *way* too short, 1000ms is pretty 66 short too), 67 68 - to reduce the number of round-trip messages from the compute nodes, 69 client-side message post routines can optionally wait (with a timeout) 70 and thus increase a chance of an immediate completion. Use 71 BMI_set_info(BMI_ZOID_POST_TIMEOUT, <timeout_ms>) to enable this feature. 59 short too). 72 60 73 61 … … 110 98 111 99 The zbmi plugin is mostly stateless so far as the compute node clients are 112 concerned. Specifically, the information on posted expected message113 sends/receives that were not immediately completed is stored exclusively on 114 theclient side.100 concerned. Specifically, the information on posted, but not immediately 101 completed expected message sends/receives is stored exclusively on the 102 client side. 115 103 116 104 All BMI send routines end up in zoid_post_send_common. That includes 117 105 unexpected messages and list I/O. This routine attempts to forward the 118 106 message to the zbmi plugin on the I/O node, using zbmi_send. For 119 unexpected messages, zbmi_send should normally succeed and result in an120 i mmediate completion; however, if the zbmi plugin is out of memory,107 unexpected messages, zbmi_send is normally expected to succeed and result 108 in an immediate completion; however, if the zbmi plugin is out of memory, 121 109 zbmi_send will fail with ENOMEM. The same failure will occur with expected 122 110 messages if a matching receive has not been posted on the I/O node side by … … 124 112 request is put in the "zoid_ops" queue for another attempt later. For 125 113 expected messages, if a matching receive has been posted, the call succeeds 126 resulting in an immediate completion. zbmi_send normally does not block, 127 but if BMI_ZOID_POST_TIMEOUT has been enabled, it can, waiting for a 128 matching expected message post from the BMI server side or for memory to be 129 released on the BMI server side so that an unexpected message can be 130 stored. 114 resulting in an immediate completion. 131 115 132 116 The way zbmi_send is forwarded by ZOID, the data payload is only … … 140 124 is sent to the compute node and zbmi_recv returns 1, resulting in an 141 125 immediate completion. Otherwise, the receive request is put in the 142 "zoid_ops" queue for another attempt later. zbmi_recv normally does not 143 block, but if BMI_ZOID_POST_TIMEOUT has been enabled, it can, waiting for a 144 matching expected message post from the BMI server side. 126 "zoid_ops" queue for another attempt later. 145 127 146 128 BMI_cancel is very easy to implement thanks to a lack of multi-threading … … 156 138 sent to the server anymore. For non-canceled requests, it extracts the 157 139 message tag, size, and send/recv indicator and forwards those to the server 158 using zbmi_test. zbmi_test can block on the server for the specified time 159 if none of the specified requests is initially ready. zbmi_test returns 160 the number of ready requests; if it is non-zero, then the server side must 161 have posted matching sends/receives, so zoid_test_common next attempts to 162 satisfy those "ready" requests by invoking zbmi_send/zbmi_recv. Those 163 send/recv routines could still fail in spite of a successful test if there 164 is no memory or if the server side canceled its matching request; this is 165 recoverable. 140 using zbmi_test. zbmi_test is the only blocking call of the three; it can 141 block on the server for the specified time if none of the specified 142 requests is initially ready. zbmi_test returns the number of ready 143 requests; if it is non-zero, then zoid_test_common next attempts to satisfy 144 those requests by invoking zbmi_send/zbmi_recv. Those send/recv routines 145 could still fail in spite of a successful test, if there is no memory, or 146 if the server-side canceled its matching request; this is recoverable. 166 147 167 148 SERVER … … 169 150 The communication between the I/O node BMI server and the zbmi plugin of 170 151 the ZOID daemon is carried across two channels. Commands are sent via a 171 POSIX message queue(zbmi plugin is the server; multiple threads of the BMI172 server can communicate simultaneously by opening multiple reply queues to173 the s erver). Payload is exchanged using a large shared memory segment,152 unix domain socket (zbmi plugin is the server; multiple threads of the BMI 153 server can communicate simultaneously by opening multiple connections to 154 the socket). Payload is exchanged using a large shared memory segment, 174 155 allocated by the zbmi plugin. We make efforts to avoid unnecessary copies 175 156 to/from that segment, so BMI_memalloc() on the BMI server side allocates … … 178 159 179 160 The shared memory segment is split in two: a normally smaller region is 180 used for unexpected messages and is managed by the zbmi plugin, w hile a161 used for unexpected messages and is managed by the zbmi plugin, wile a 181 162 larger region is used for expected messages and is managed by the BMI 182 163 server. 183 164 184 165 The communication with the zbmi plugin is established during 185 BMI_initialize, and terminated during BMI_finalize. The messageprotocol is166 BMI_initialize, and terminated during BMI_finalize. The stream protocol is 186 167 documented in zbmi's zbmi_protocol.h. 187 168 188 169 For BMI testunexpected, we communicate the metadata on the pending received 189 messages via the queue, and the payload is in the shared memory buffer170 messages via the socket, and the payload is in the shared memory buffer 190 171 which is returned to the user. unexpected_free just sends the buffer 191 172 address back to the zbmi plugin, since the unexpected messages memory pool … … 200 181 retry the allocation after every BMI_memfree. 201 182 202 Expected server-side posts, be it sends or receives, can be completed 203 immediately if a matching client-side post (or test) is waiting when 204 server-side post is issued. Note that "immediately" is used liberally 205 here; the server-side post will not return until the buffer has been 206 transferred to/from the compute node, which can take some time when the 207 ZOID server is under heavy load. For posts not completed immediately we 208 send a message descriptor to the zbmi plugin which registers the message 209 and just sends back a confirmation. When registering we exchange the 210 internal BMI id and the internal ZOID id, since that simplifies subsequent 211 testing/canceling. 183 Expected server-side posts, be it sends or receives, are never completed 184 immediately: we send a message descriptor to the zbmi plugin which 185 registers it and just sends back a confirmation. When registering we 186 exchange the internal BMI id and the internal ZOID id, since that 187 simplifies subsequent testing/canceling. 212 188 213 189 Canceling messages is more complex than on the client side. Generally, we … … 217 193 be ignored. An exception is when the request has not been registered 218 194 because of the lack of memory for a temporary buffer as described earlier; 219 in that case we cancel it locally and put i tin "error_ops" queue.195 in that case we cancel it locally and put in in "error_ops" queue. 220 196 221 197 When testing (in zoid_server_test_common), we need to deal with locally … … 225 201 since they involve no communication with the zbmi plugin. Unlike on the 226 202 client side, where the common test routines sort-of "emulated" testcontext 227 b yfirst building an array of all pending request ids, on the server side203 but first building an array of all pending request ids, on the server side 228 204 we have a "native" implementation. Testcontext is recognized by passing an 229 205 "incount" of 0, and we forward it to the zbmi plugin so that it knows to 230 206 return *any* completed request(s). This is necessary because of 231 207 multi-threading constraints, and it is possible because the zbmi plugin 232 does maintain state for server-side requests. The test is the only233 server-sidecommand that can block in the zbmi plugin for the specified208 does maintain state for server-side requests. As with the client, the test 209 is the only command that can block in the zbmi plugin for the specified 234 210 time period if no request is initially completed. Completed requests 235 211 require no further handling, with the exceptions of those that used -
branches/cu-security-branch/src/io/bmi/bmi_zoid/server.c
r8359 r8397 3 3 #include <assert.h> 4 4 #include <stdio.h> 5 #include <string.h>6 5 7 6 #include <fcntl.h> 8 #include < mqueue.h>7 #include <pthread.h> 9 8 #include <sys/mman.h> 9 #include <sys/socket.h> 10 #include <sys/un.h> 10 11 #include <unistd.h> 11 12 12 13 #include <bmi-method-support.h> 13 14 #include <bmi-method-callback.h> 14 #include <gen-locks.h>15 15 #include <id-generator.h> 16 16 #include <op-list.h> … … 21 21 22 22 /* method_op_p's method_data points to this structure on the server side. */ 23 struct zoid_server_method_data23 struct ZoidServerMethodData 24 24 { 25 25 void* tmp_buffer; /* Used with BMI_EXT_ALLOC to store the address of the 26 26 temporary shared memory buffer, NULL otherwise. */ 27 27 int zoid_buf_id; /* 0 if the operation has not yet been sent to ZOID. */ 28 gen_mutex_t post_mutex; /* Used to resolve the race between a post29 call and testcontext. */30 28 }; 31 32 #define METHOD_DATA(op) ((struct zoid_server_method_data*)(op)->method_data)33 29 34 30 /* Describes a request with BMI_EXT_ALLOC pending for a temporary memory 35 31 buffer. */ 36 struct no_mem_descriptor37 { 38 struct no_mem_descriptor* next;39 struct no_mem_descriptor* prev;32 struct NoMemDescriptor 33 { 34 struct NoMemDescriptor* next; 35 struct NoMemDescriptor* prev; 40 36 41 37 bmi_size_t total_size; … … 43 39 }; 44 40 45 /* Control queue to the zbmi plugin in the ZOID daemon. */ 46 static mqd_t zbmi_control_queue = -1; 47 48 /* Reply queues from the zbmi plugin. */ 49 #define ZBMI_QUEUES_LEN_INIT 10 50 static gen_mutex_t zbmi_queues_mutex = GEN_MUTEX_INITIALIZER; 51 static mqd_t* zbmi_queues = NULL; 52 static char* zbmi_queues_inuse = NULL; /* Whether a particular zbmi_queues 53 entry is currently in use. */ 54 static int zbmi_queues_len = 0; /* Length of zbmi_queues and 55 zbmi_queues_inuse. */ 56 static int zbmi_queues_used = 0; /* Count of initialized zbmi_queues 57 entries. */ 41 /* Command streams to the zbmi plugin in the ZOID daemon. */ 42 #define ZBMI_SOCKETS_LEN_INIT 10 43 static pthread_mutex_t zbmi_sockets_mutex = PTHREAD_MUTEX_INITIALIZER; 44 static int* zbmi_sockets = NULL; 45 static char* zbmi_sockets_inuse = NULL; /* Whether a particular zbmi_sockets 46 entry is currently in use. */ 47 static int zbmi_sockets_len = 0; /* Length of zbmi_sockets and 48 zbmi_sockets_inuse. */ 49 static int zbmi_sockets_used = 0; /* Count of initialized zbmi_sockets 50 entries. */ 58 51 59 52 /* An array of client addresses. */ 60 53 #define CLIENTS_LEN_INC 10 61 static gen_mutex_t clients_mutex = GEN_MUTEX_INITIALIZER;54 static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; 62 55 static bmi_method_addr_p* clients_addr = NULL; 63 56 static int clients_len = 0; 64 57 65 58 /* Shared memory buffer between the ZBMI plugin an us. */ 66 static char* zbmi_shm = NULL;67 static char *zbmi_shm_eager, *zbmi_shm_rzv;59 static void* zbmi_shm = NULL; 60 static void *zbmi_shm_unexp, *zbmi_shm_exp; 68 61 69 62 static int zbmi_shm_size_total; 70 static int zbmi_shm_size_eager; 71 static struct ZBMIEagerDesc* eager_desc; 63 static int zbmi_shm_size_unexp; 72 64 73 65 /* Queue of operations with BMI_EXT_ALLOC buffers pending for a temporary 74 66 memory buffer, sorted by descending total_size. */ 75 static struct no_mem_descriptor *no_mem_queue_first = NULL;67 static struct NoMemDescriptor *no_mem_queue_first = NULL; 76 68 /* Only valid if no_mem_queue_first != NULL. */ 77 static struct no_mem_descriptor *no_mem_queue_last;69 static struct NoMemDescriptor *no_mem_queue_last; 78 70 /* Protects access to the above queue. */ 79 static gen_mutex_t no_mem_queue_mutex = GEN_MUTEX_INITIALIZER;71 static pthread_mutex_t no_mem_queue_mutex = PTHREAD_MUTEX_INITIALIZER; 80 72 81 73 /* Queue of failed/canceled operations (that the ZOID server doesn't know 82 74 about). */ 83 75 static op_list_p error_ops; 84 static gen_mutex_t error_ops_mutex = GEN_MUTEX_INITIALIZER; 85 86 87 static mqd_t get_zoid_reply_queue(int* queue_id); 88 static void release_zoid_reply_queue(int queue_id); 76 static pthread_mutex_t error_ops_mutex = PTHREAD_MUTEX_INITIALIZER; 77 78 79 static ssize_t socket_read(int fd, void* buf, size_t count); 80 static ssize_t socket_write(int fd, const void* buf, size_t count); 81 static int get_zoid_socket(int* release_token); 82 static void release_zoid_socket(int release_token); 89 83 static bmi_method_addr_p get_client_addr(int zoid_addr); 90 84 static int enqueue_no_mem(method_op_p op, bmi_size_t total_size); 91 static int send_post_cmd(method_op_p op , int not_immediate, int* length);85 static int send_post_cmd(method_op_p op); 92 86 93 87 … … 103 97 BMI_zoid_server_initialize(void) 104 98 { 105 struct ZBMIControlInitCmd cmd;106 struct ZBMIControlInitResp resp;99 int hdr; 100 struct ZBMIControlInitResp init_resp; 107 101 int shm_fd; 108 mqd_t reply_queue; 109 int queue_id; 110 111 /* Connect to the ZBMI plugin in the ZOID daemon. */ 112 while ((zbmi_control_queue = mq_open(ZBMI_CONTROL_QUEUE_NAME, O_WRONLY)) 113 < 0) 114 { 115 if (errno == ENOENT) 116 { 117 /* Probably the ZOID server is not running yet... */ 118 sleep(1); 119 } 120 else 121 { 122 perror("connect to ZOID"); 123 return -BMI_EINVAL; 124 } 125 } 126 127 /* This will initialize all the queue structures first. */ 128 if ((reply_queue = get_zoid_reply_queue(&queue_id)) < 0) 129 return reply_queue; 102 int zoid_fd, zoid_release; 103 104 /* Connect to the ZBMI plugin in the ZOID daemon. This will initialize 105 all the socket structures first. */ 106 107 if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 108 return zoid_fd; 130 109 131 110 /* Initial handshake. */ 132 111 133 cmd.command_id = ZBMI_CONTROL_INIT; 134 cmd.queue_id = queue_id; 135 136 if (mq_send(zbmi_control_queue, (void*)&cmd, sizeof(cmd), 0) < 0) 137 { 138 perror("mq_send"); 139 release_zoid_reply_queue(queue_id); 112 hdr = ZBMI_CONTROL_INIT; 113 114 if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr)) 115 { 116 perror("write"); 117 release_zoid_socket(zoid_release); 140 118 return -BMI_EINVAL; 141 119 } 142 120 143 if ( mq_receive(reply_queue, (void*)&resp, ZBMI_MAX_MSG_SIZE, NULL)144 != sizeof(resp))145 { 146 perror(" mq_receive");147 release_zoid_ reply_queue(queue_id);121 if (socket_read(zoid_fd, &init_resp, sizeof(init_resp)) != 122 sizeof(init_resp)) 123 { 124 perror("read"); 125 release_zoid_socket(zoid_release); 148 126 return -BMI_EINVAL; 149 127 } 150 128 151 release_zoid_ reply_queue(queue_id);152 153 zbmi_shm_size_total = resp.shm_size_total;154 zbmi_shm_size_ eager = resp.shm_size_eager;129 release_zoid_socket(zoid_release); 130 131 zbmi_shm_size_total = init_resp.shm_size_total; 132 zbmi_shm_size_unexp = init_resp.shm_size_unexp; 155 133 156 134 /* Open the shared memory area. */ … … 172 150 close(shm_fd); 173 151 174 /* The shared memory buffer starts with an eagersection, which is152 /* The shared memory buffer starts with an unexpected section, which is 175 153 managed by the ZBMI ZOID plugin. */ 176 zbmi_shm_eager = zbmi_shm; 177 /* Eager section starts with a descriptor. */ 178 eager_desc = (struct ZBMIEagerDesc*)zbmi_shm_eager; 179 /* The rendezvous buffers part is initialized and managed by us. */ 180 zbmi_shm_rzv = zbmi_shm + zbmi_shm_size_eager; 181 zbmi_pool_init(zbmi_shm_rzv, zbmi_shm_size_total - zbmi_shm_size_eager); 154 zbmi_shm_unexp = zbmi_shm; 155 /* The expected buffers part is initialized and managed by us. */ 156 zbmi_shm_exp = zbmi_shm + zbmi_shm_size_unexp; 157 zbmi_pool_init(zbmi_shm_exp, zbmi_shm_size_total - zbmi_shm_size_unexp); 182 158 183 159 if (!(error_ops = op_list_new())) … … 204 180 205 181 /* FIXME! Send some sort of a FINI message first? */ 206 for (i = 0; i < zbmi_queues_used; i++) 207 { 208 char queue_name[256]; 209 mq_close(zbmi_queues[i]); 210 sprintf(queue_name, ZBMI_REPLY_QUEUE_TEMPLATE, i); 211 mq_unlink(queue_name); 212 } 213 free(zbmi_queues_inuse); 214 free(zbmi_queues); 215 zbmi_queues_len = zbmi_queues_used = 0; 216 mq_close(zbmi_control_queue); 182 for (i = 0; i < zbmi_sockets_used; i++) 183 close(zbmi_sockets[i]); 184 free(zbmi_sockets_inuse); 185 free(zbmi_sockets); 186 zbmi_sockets_len = zbmi_sockets_used = 0; 217 187 218 188 return 0; … … 231 201 BMI_zoid_server_memfree(void* buffer) 232 202 { 233 struct no_mem_descriptor* desc;203 struct NoMemDescriptor* desc; 234 204 235 205 zbmi_pool_free(buffer); … … 237 207 /* Once some memory has been freed, go over the queue of requests waiting 238 208 for memory and try to satisfy any of them. */ 239 gen_mutex_lock(&no_mem_queue_mutex);209 pthread_mutex_lock(&no_mem_queue_mutex); 240 210 241 211 for (desc = no_mem_queue_first; desc;) 242 212 { 243 struct no_mem_descriptor* desc_next = desc->next;213 struct NoMemDescriptor* desc_next = desc->next; 244 214 void* buf; 245 215 … … 248 218 method_op_p op = desc->op; 249 219 250 METHOD_DATA(op)->tmp_buffer = buf;220 ((struct ZoidServerMethodData*)op->method_data)->tmp_buffer = buf; 251 221 252 222 if (op->send_recv == BMI_SEND) … … 275 245 free(desc); 276 246 277 /* Post it as "non-immediate" since we do not have facilities 278 here to manage immediate completions. */ 279 if ((op->error_code = -send_post_cmd(op, 1, NULL))) 280 { 281 gen_mutex_lock(&error_ops_mutex); 247 if ((op->error_code = -send_post_cmd(op))) 248 { 249 pthread_mutex_lock(&error_ops_mutex); 282 250 op_list_add(error_ops, op); 283 gen_mutex_unlock(&error_ops_mutex);251 pthread_mutex_unlock(&error_ops_mutex); 284 252 } 285 253 } … … 288 256 } 289 257 290 gen_mutex_unlock(&no_mem_queue_mutex);258 pthread_mutex_unlock(&no_mem_queue_mutex); 291 259 } 292 260 … … 295 263 BMI_zoid_server_unexpected_free(void* buffer) 296 264 { 297 struct ZBMIControlEagerFreeCmd cmd; 298 299 if ((char*)buffer < zbmi_shm_eager || 300 (char*)buffer >= zbmi_shm_eager + zbmi_shm_size_eager) 301 { 265 int zoid_fd, zoid_release; 266 int hdr; 267 struct ZBMIControlUnexpFreeCmd cmd; 268 269 if (buffer < zbmi_shm_unexp || 270 buffer >= zbmi_shm_unexp + zbmi_shm_size_unexp) 302 271 return -BMI_EINVAL; 303 } 304 305 cmd.command_id = ZBMI_CONTROL_EAGER_FREE; 306 cmd.unexpected = 1; 307 cmd.buffer = (char*)buffer - zbmi_shm_eager; 308 309 if (mq_send(zbmi_control_queue, (void*)&cmd, sizeof(cmd), 0) < 0) 310 { 311 perror("mq_send"); 272 273 if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 274 return zoid_fd; 275 276 hdr = ZBMI_CONTROL_UNEXP_FREE; 277 cmd.buffer = buffer - zbmi_shm_unexp; 278 279 if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr) || 280 socket_write(zoid_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) 281 { 282 perror("write"); 283 release_zoid_socket(zoid_release); 312 284 return -BMI_EINVAL; 313 285 } 314 286 287 release_zoid_socket(zoid_release); 315 288 return 0; 316 289 } … … 320 293 BMI_zoid_server_testunexpected(int incount, int* outcount, 321 294 struct bmi_method_unexpected_info* info, 322 uint8_t class,int max_idle_time_ms)323 { 324 mqd_t reply_queue;325 int queue_id;295 int max_idle_time_ms) 296 { 297 int zoid_fd, zoid_release; 298 int hdr; 326 299 int i; 327 300 struct ZBMIControlUnexpTestCmd cmd; 328 char resp_buffer[ZBMI_MAX_MSG_SIZE]; 329 struct ZBMIControlUnexpTestResp* resp; 330 struct SharedMessageDescriptor* msg_desc; 331 332 /* Check for ready messages in the shared memory region first. */ 333 sem_wait(&eager_desc->unexp_queue_sem); 334 335 *outcount = 0; 336 for (msg_desc = (struct SharedMessageDescriptor*) 337 (eager_desc->unexp_queue_first + zbmi_shm_eager); 338 msg_desc != (struct SharedMessageDescriptor*)zbmi_shm_eager; 339 msg_desc = (struct SharedMessageDescriptor*) 340 (msg_desc->next + zbmi_shm_eager)) 341 { 342 if (*outcount >= incount) 343 break; 344 345 if (msg_desc->message_type == MSG_TYPE_SEND && 346 msg_desc->class == class) 347 { 348 info[*outcount].error_code = 0; 349 if (!(info[*outcount].addr = get_client_addr(msg_desc->addr))) 350 return -BMI_ENOMEM; 351 info[*outcount].buffer = msg_desc->buffer; 352 info[*outcount].size = msg_desc->size; 353 info[*outcount].tag = msg_desc->tag; 354 355 /* Prevent any subsequent tests from matching it. */ 356 msg_desc->message_type = MSG_TYPE_DONE; 357 358 (*outcount)++; 359 } 360 } 361 362 sem_post(&eager_desc->unexp_queue_sem); 363 364 if (*outcount > 0 || max_idle_time_ms == 0) 365 return 0; 366 367 if ((reply_queue = get_zoid_reply_queue(&queue_id)) < 0) 368 return reply_queue; 369 370 cmd.command_id = ZBMI_CONTROL_UNEXP_TEST; 371 cmd.queue_id = queue_id; 301 struct ZBMIControlUnexpTestResp resp; 302 struct ZBMIControlBufDesc* buf_descs = NULL; 303 304 if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 305 return zoid_fd; 306 307 hdr = ZBMI_CONTROL_UNEXP_TEST; 372 308 cmd.incount = incount; 373 309 cmd.max_idle_time_ms = max_idle_time_ms; 374 cmd.class = class; 375 376 if (mq_send(zbmi_control_queue, (void*)&cmd, sizeof(cmd), 0) < 0)377 { 378 perror(" mq_send");379 release_zoid_ reply_queue(queue_id);310 311 if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr) || 312 socket_write(zoid_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) 313 { 314 perror("write"); 315 release_zoid_socket(zoid_release); 380 316 return -BMI_EINVAL; 381 317 } 382 318 383 if (mq_receive(reply_queue, resp_buffer, sizeof(resp_buffer), NULL) < 0) 384 { 385 perror("mq_receive"); 386 release_zoid_reply_queue(queue_id); 319 if (socket_read(zoid_fd, &resp, offsetof(typeof(resp), buffers)) != 320 offsetof(typeof(resp), buffers)) 321 { 322 perror("read"); 323 release_zoid_socket(zoid_release); 387 324 return -BMI_EINVAL; 388 325 } 389 390 release_zoid_reply_queue(queue_id); 391 392 resp = (struct ZBMIControlUnexpTestResp*)resp_buffer; 393 *outcount = resp->outcount; 394 for (i = 0; i < resp->outcount; i++) 395 { 396 msg_desc = (struct SharedMessageDescriptor*) 397 (resp->descs[i] + zbmi_shm_eager); 398 326 if (resp.outcount_bytes > 0) 327 { 328 buf_descs = alloca(resp.outcount_bytes); 329 330 if (socket_read(zoid_fd, buf_descs, resp.outcount_bytes) != 331 resp.outcount_bytes) 332 { 333 perror("read"); 334 release_zoid_socket(zoid_release); 335 return -BMI_EINVAL; 336 } 337 } 338 339 release_zoid_socket(zoid_release); 340 341 *outcount = resp.outcount; 342 for (i = 0; i < resp.outcount; i++) 343 { 399 344 info[i].error_code = 0; 400 if (!(info[i].addr = get_client_addr(msg_desc->addr))) 345 346 if (!(info[i].addr = get_client_addr(buf_descs->addr))) 401 347 return -BMI_ENOMEM; 402 info[i].buffer = msg_desc->buffer; 403 info[i].size = msg_desc->size; 404 info[i].tag = msg_desc->tag; 405 406 /* msg_desc->message_type has already been set to MSG_TYPE_DONE 407 by the zbmi plugin. */ 348 349 if (buf_descs->list_count != 1) 350 return -BMI_EINVAL; 351 352 info[i].buffer = zbmi_shm_unexp + buf_descs->list[0].buffer; 353 info[i].size = buf_descs->list[0].size; 354 info[i].tag = buf_descs->tag; 355 356 buf_descs = (struct ZBMIControlBufDesc*) 357 (((char*)buf_descs) + offsetof(typeof(*buf_descs), list) + 358 buf_descs->list_count * sizeof(buf_descs->list[0])); 408 359 } 409 360 … … 421 372 { 422 373 method_op_p new_op; 423 int ret; 424 425 if (!(new_op = bmi_alloc_method_op(sizeof(struct zoid_server_method_data)))) 374 375 /* Server-side sends are never immediate, so we start by allocating a 376 method op. */ 377 378 if (!(new_op = bmi_alloc_method_op(sizeof(struct ZoidServerMethodData)))) 426 379 return -BMI_ENOMEM; 427 380 *id = new_op->op_id; … … 445 398 } 446 399 new_op->error_code = 0; 447 METHOD_DATA(new_op)->tmp_buffer = NULL;448 METHOD_DATA(new_op)->zoid_buf_id = 0;400 ((struct ZoidServerMethodData*)new_op->method_data)->tmp_buffer = NULL; 401 ((struct ZoidServerMethodData*)new_op->method_data)->zoid_buf_id = 0; 449 402 450 403 if (buffer_type == BMI_EXT_ALLOC) … … 462 415 } 463 416 464 METHOD_DATA(new_op)->tmp_buffer = buf;417 ((struct ZoidServerMethodData*)new_op->method_data)->tmp_buffer = buf; 465 418 466 419 for (i = 0; i < list_count; i++) … … 475 428 int i; 476 429 for (i = 0; i < list_count; i++) 477 if ( (char*)buffer_list[i] < zbmi_shm_rzv||478 (char*)buffer_list[i] + size_list[i] > zbmi_shm_rzv+479 zbmi_shm_size_total - zbmi_shm_size_ eager)430 if (buffer_list[i] < zbmi_shm_exp || 431 buffer_list[i] + size_list[i] > zbmi_shm_exp + 432 zbmi_shm_size_total - zbmi_shm_size_unexp) 480 433 { 481 434 return -BMI_EINVAL; … … 483 436 } 484 437 485 if ((ret = send_post_cmd(new_op, 0, NULL)) == 1) 486 { 487 /* Immediate completion. */ 488 if (buffer_type == BMI_EXT_ALLOC) 489 BMI_zoid_server_memfree(METHOD_DATA(new_op)->tmp_buffer); 490 491 bmi_dealloc_method_op(new_op); 492 } 493 494 return ret; 438 return send_post_cmd(new_op); 495 439 } 496 440 … … 506 450 { 507 451 method_op_p new_op; 508 int ret, length; 509 struct SharedMessageDescriptor* msg_desc; 510 511 /* Check for matching eager messages first. */ 512 513 sem_wait(&eager_desc->eager_queue_sem); 514 515 for (msg_desc = (struct SharedMessageDescriptor*) 516 (eager_desc->eager_queue_first + zbmi_shm_eager); 517 msg_desc != (struct SharedMessageDescriptor*)zbmi_shm_eager; 518 msg_desc = (struct SharedMessageDescriptor*) 519 (msg_desc->next + zbmi_shm_eager)) 520 { 521 if (msg_desc->message_type == MSG_TYPE_SEND && 522 msg_desc->addr == ((struct zoid_addr*)src->method_data)->pid && 523 msg_desc->tag == tag) 524 { 525 break; 526 } 527 } 528 529 if (msg_desc != (struct SharedMessageDescriptor*)zbmi_shm_eager) 530 { 531 int i; 532 size_t copied; 533 struct ZBMIControlEagerFreeCmd cmd; 534 535 /* We found a matching eager message. All we need to do is copy 536 the memory over. */ 537 538 if (total_expected_size < msg_desc->size) 539 { 540 fprintf(stderr, "Message size mismatch, ION " 541 "expected size %lld < CN total size %d\n", 542 total_expected_size, msg_desc->size); 543 544 sem_post(&eager_desc->eager_queue_sem); 545 return -BMI_EINVAL; 546 } 547 548 copied = 0; 549 i = 0; 550 while (copied < msg_desc->size) 551 { 552 size_t tocopy = (size_list[i] < msg_desc->size - copied ? 553 size_list[i] : msg_desc->size - copied); 554 memcpy(buffer_list[i], msg_desc->buffer + copied, tocopy); 555 copied += tocopy; 556 i++; 557 } 558 *total_actual_size = msg_desc->size; 559 560 /* Prevent any subsequent matches. */ 561 msg_desc->message_type = MSG_TYPE_DONE; 562 563 sem_post(&eager_desc->eager_queue_sem); 564 565 /* Request its release (only the zbmi plugin can do it). */ 566 cmd.command_id = ZBMI_CONTROL_EAGER_FREE; 567 cmd.unexpected = 0; 568 cmd.buffer = (char*)msg_desc - zbmi_shm_eager; 569 570 if (mq_send(zbmi_control_queue, (void*)&cmd, sizeof(cmd), 0) < 0) 571 { 572 perror("mq_send"); 573 return -BMI_EINVAL; 574 } 575 576 /* Immediate completion. */ 577 return 1; 578 } 579 580 sem_post(&eager_desc->eager_queue_sem); 581 582 if (!(new_op = bmi_alloc_method_op(sizeof(struct zoid_server_method_data)))) 452 453 /* Server-side receives are never immediate, so we start by allocating a 454 method op. */ 455 456 if (!(new_op = bmi_alloc_method_op(sizeof(struct ZoidServerMethodData)))) 583 457 return -BMI_ENOMEM; 584 458 *id = new_op->op_id; … … 602 476 } 603 477 new_op->error_code = 0; 604 METHOD_DATA(new_op)->tmp_buffer = NULL;605 METHOD_DATA(new_op)->zoid_buf_id = 0;478 ((struct ZoidServerMethodData*)new_op->method_data)->tmp_buffer = NULL; 479 ((struct ZoidServerMethodData*)new_op->method_data)->zoid_buf_id = 0; 606 480 607 481 if (buffer_type == BMI_EXT_ALLOC) … … 617 491 } 618 492 619 METHOD_DATA(new_op)->tmp_buffer = buf;493 ((struct ZoidServerMethodData*)new_op->method_data)->tmp_buffer = buf; 620 494 } 621 495 else … … 624 498 int i; 625 499 for (i = 0; i < list_count; i++) 626 if ( (char*)buffer_list[i] < zbmi_shm_rzv||627 (char*)buffer_list[i] + size_list[i] > zbmi_shm_rzv+628 zbmi_shm_size_total - zbmi_shm_size_ eager)500 if (buffer_list[i] < zbmi_shm_exp || 501 buffer_list[i] + size_list[i] > zbmi_shm_exp + 502 zbmi_shm_size_total - zbmi_shm_size_unexp) 629 503 { 630 504 return -BMI_EINVAL; … … 632 506 } 633 507 634 if ((ret = send_post_cmd(new_op, 0, &length)) == 1) 635 { 636 /* Immediate completion. */ 637 *total_actual_size = length; 638 639 if (buffer_type == BMI_EXT_ALLOC) 640 { 641 /* Copy the memory back to the user buffer(s). */ 642 int j, size_remaining = length; 643 void *buf_cur = METHOD_DATA(new_op)->tmp_buffer; 644 j = 0; 645 while (size_remaining > 0) 646 { 647 int tocopy = (new_op->size_list[j] < size_remaining ? 648 new_op->size_list[j] : size_remaining); 649 650 memcpy(new_op->buffer_list[j], buf_cur, tocopy); 651 buf_cur += tocopy; 652 size_remaining -= tocopy; 653 j++; 654 } 655 656 BMI_zoid_server_memfree(METHOD_DATA(new_op)->tmp_buffer); 657 } 658 659 bmi_dealloc_method_op(new_op); 660 } 661 662 return ret; 508 return send_post_cmd(new_op); 663 509 } 664 510 … … 673 519 bmi_context_id context_id) 674 520 { 675 mqd_t reply_queue; 676 int queue_id; 677 char cmd_buffer[ZBMI_MAX_MSG_SIZE], resp_buffer[ZBMI_MAX_MSG_SIZE]; 521 int zoid_fd, zoid_release; 522 int hdr; 678 523 struct ZBMIControlTestCmd* cmd; 679 524 int cmd_len; 680 struct ZBMIControlTestResp *resp;681 int i , index;525 struct ZBMIControlTestResp resp; 526 int i; 682 527 int outcount_used = 0; /* Counter of already used output entries. */ 683 528 int incount_fwd = incount; /* Counter of how many input entries to … … 686 531 /* We start by checking if there are any local failed/canceled operations 687 532 and taking care of those first. */ 688 gen_mutex_lock(&error_ops_mutex);533 pthread_mutex_lock(&error_ops_mutex); 689 534 if (incount) 690 535 { … … 709 554 710 555 op_list_remove(op); 711 bmi_dealloc_method_op(op); 712 } 556 /* Note: we will dealloc a little later. */ 557 } 558 } 559 560 if (outcount_used > 0) 561 { 562 incount_fwd = incount - outcount_used; 563 if (index_array) 564 index_array += outcount_used; 713 565 } 714 566 } … … 733 585 bmi_dealloc_method_op(op); 734 586 } 735 } 736 gen_mutex_unlock(&error_ops_mutex); 737 738 if (outcount_used > 0) 739 { 740 /* Return immediately if there is something to return. */ 741 *outcount = outcount_used; 742 return 0; 743 } 744 #if 0 /* Commented out so that testcontext won't return 0 if there are ready 745 expected rendezvous messages. */ 746 else if (!incount) 747 { 748 /* Testcontext. Check in the shared memory region for ready 749 unexpected messages and return immediately if any. */ 750 struct SharedMessageDescriptor* msg_desc; 751 752 sem_wait(&eager_desc->unexp_queue_sem); 753 754 for (msg_desc = (struct SharedMessageDescriptor*) 755 (eager_desc->unexp_queue_first + zbmi_shm_eager); 756 msg_desc != (struct SharedMessageDescriptor*)zbmi_shm_eager; 757 msg_desc = (struct SharedMessageDescriptor*) 758 (msg_desc->next + zbmi_shm_eager)) 759 { 760 if (msg_desc->message_type == MSG_TYPE_SEND) 761 break; 762 } 763 764 sem_post(&eager_desc->unexp_queue_sem); 765 766 if (msg_desc != (struct SharedMessageDescriptor*)zbmi_shm_eager) 767 { 768 *outcount = 0; 769 return 0; 770 } 771 } 772 #endif 773 774 if ((reply_queue = get_zoid_reply_queue(&queue_id)) < 0) 775 return reply_queue; 776 587 588 if (outcount_used > 0) 589 { 590 id_array += outcount_used; 591 error_code_array += outcount_used; 592 actual_size_array += outcount_used; 593 user_ptr_array += outcount_used; 594 } 595 } 596 pthread_mutex_unlock(&error_ops_mutex); 597 598 hdr = ZBMI_CONTROL_TEST; 777 599 cmd_len = offsetof(typeof(*cmd), zoid_ids) + 778 600 incount_fwd * sizeof(cmd->zoid_ids[0]); 779 if (cmd_len > ZBMI_MAX_MSG_SIZE) 780 return -BMI_EINVAL; 781 782 cmd = (struct ZBMIControlTestCmd*)cmd_buffer; 783 cmd->command_id = ZBMI_CONTROL_TEST; 784 cmd->queue_id = queue_id; 601 cmd = alloca(cmd_len); 602 785 603 cmd->timeout_ms = max_idle_time_ms; 604 786 605 /* incount_fwd == 0 indicates "testcontext". We still need to communicate 787 606 the max. count of outputs we are prepared to handle. */ … … 790 609 { 791 610 method_op_p op = (method_op_p)id_gen_fast_lookup(id_array[i]); 792 793 cmd->zoid_ids[i] = METHOD_DATA(op)->zoid_buf_id; 794 } 795 796 if (mq_send(zbmi_control_queue, (void*)cmd, cmd_len, 0) < 0) 797 { 798 perror("mq_send"); 799 release_zoid_reply_queue(queue_id); 611 if (op->error_code) 612 { 613 bmi_dealloc_method_op(op); 614 continue; 615 } 616 cmd->zoid_ids[i] = ((struct ZoidServerMethodData*)op->method_data)-> 617 zoid_buf_id; 618 } 619 620 if (outcount_used > 0) 621 { 622 outcount_max -= outcount_used; 623 if (outcount_max == 0 || (incount > 0 && incount_fwd == 0)) 624 { 625 *outcount = outcount_used; 626 return 0; 627 } 628 } 629 630 /* Note: this is shifted later than usual in the function body so that 631 we can invoke bmi_dealloc_method_op above as appropriate. */ 632 if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 633 return zoid_fd; 634 635 if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr) || 636 socket_write(zoid_fd, cmd, cmd_len) != cmd_len) 637 { 638 perror("write"); 639 release_zoid_socket(zoid_release); 800 640 return -BMI_EINVAL; 801 641 } 802 642 803 if (mq_receive(reply_queue, resp_buffer, sizeof(resp_buffer), NULL) < 0) 804 { 805 perror("mq_receive"); 806 release_zoid_reply_queue(queue_id); 643 if (socket_read(zoid_fd, &resp, offsetof(typeof(resp), list)) != 644 offsetof(typeof(resp), list)) 645 { 646 perror("read"); 647 release_zoid_socket(zoid_release); 807 648 return -BMI_EINVAL; 808 649 } 809 resp = (struct ZBMIControlTestResp*)resp_buffer; 810 811 assert(resp->count <= outcount_max); 812 *outcount = resp->count; 813 814 for (i = 0, index = 0; i < resp->count; i++, index++) 815 { 816 method_op_p op = (method_op_p)id_gen_fast_lookup(resp->list[i]. 817 bmi_id); 818 if (incount_fwd) 819 { 820 for (; index < incount_fwd; index++) 821 { 822 if (cmd->zoid_ids[index] == METHOD_DATA(op)->zoid_buf_id) 823 break; 824 } 825 assert(index < incount_fwd); 826 827 if (index_array) 828 index_array[i] = index; 650 assert(resp.count <= outcount_max); 651 *outcount = resp.count; 652 if (resp.count > 0) 653 { 654 struct ZBMIControlTestRespList* resp_list; 655 int index; 656 657 resp_list = alloca(resp.count * sizeof(*resp_list)); 658 659 if (socket_read(zoid_fd, resp_list, resp.count * sizeof(*resp_list)) != 660 resp.count * sizeof(*resp_list)) 661 { 662 perror("read"); 663 release_zoid_socket(zoid_release); 664 return -BMI_EINVAL; 665 } 666 667 for (i = 0, index = 0; i < resp.count; i++, index++) 668 { 669 method_op_p op = (method_op_p)id_gen_fast_lookup(resp_list[i]. 670 bmi_id); 671 if (incount_fwd) 672 { 673 for (; index < incount_fwd; index++) 674 { 675 if (cmd->zoid_ids[index] == ((struct ZoidServerMethodData*) 676 op->method_data)->zoid_buf_id) 677 break; 678 } 679 assert(index < incount_fwd); 680 681 if (index_array) 682 index_array[i] = index; 683 else 684 assert(i == index); 685 } 686 else /* testcontext */ 687 id_array[i] = resp_list[i].bmi_id; 688 689 if (resp_list[i].length < 0) /* Most likely BMI_ECANCEL */ 690 { 691 error_code_array[index] = -resp_list[i].length; 692 actual_size_array[index] = 0; 693 } 829 694 else 830 assert(i == index); 831 } 832 else /* testcontext */ 833 { 834 /* Make sure the returned method_op is fully initialized. 835 Only an issue for testcontext, since other test calls 836 require bmi_op_id_t, which implies full initialization. */ 837 gen_mutex_lock(&METHOD_DATA(op)->post_mutex); 838 gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 839 id_array[i] = resp->list[i].bmi_id; 840 } 841 842 if (resp->list[i].length < 0) /* Most likely BMI_ECANCEL */ 843 { 844 error_code_array[index] = -resp->list[i].length; 845 actual_size_array[index] = 0; 846 } 847 else 848 { 849 actual_size_array[index] = resp->list[i].length; 850 error_code_array[index] = 0; 851 } 852 853 if (user_ptr_array) 854 user_ptr_array[index] = op->user_ptr; 855 856 /* We are done with this message. Clean up. */ 857 if (METHOD_DATA(op)->tmp_buffer) 858 { 859 if (op->send_recv == BMI_RECV) 860 { 861 /* Copy the memory back to the user buffer(s). */ 862 int j, size_remaining = resp->list[i].length; 863 void *buf_cur = METHOD_DATA(op)->tmp_buffer; 864 j = 0; 865 while (size_remaining > 0) 695 { 696 actual_size_array[index] = resp_list[i].length; 697 error_code_array[index] = 0; 698 } 699 700 if (user_ptr_array) 701 user_ptr_array[index] = op->user_ptr; 702 703 /* We are done with this message. Clean up. */ 704 if (((struct ZoidServerMethodData*)op->method_data)->tmp_buffer) 705 { 706 if (op->send_recv == BMI_RECV) 866 707 { 867 int tocopy = (op->size_list[j] < size_remaining ? 868 op->size_list[j] : size_remaining); 869 870 memcpy(op->buffer_list[j], buf_cur, tocopy); 871 buf_cur += tocopy; 872 size_remaining -= tocopy; 873 j++; 708 /* Copy the memory back to the user buffer(s). */ 709 int j, size_remaining = resp_list[i].length; 710 void *buf_cur = ((struct ZoidServerMethodData*)op-> 711 method_data)->tmp_buffer; 712 j = 0; 713 while (size_remaining > 0) 714 { 715 int tocopy = (op->size_list[j] < size_remaining ? 716 op->size_list[j] : size_remaining); 717 718 memcpy(op->buffer_list[j], buf_cur, tocopy); 719 buf_cur += tocopy; 720 size_remaining -= tocopy; 721 j++; 722 } 874 723 } 875 } 876 877 BMI_zoid_server_memfree(METHOD_DATA(op)->tmp_buffer); 878 } 879 880 bmi_dealloc_method_op(op); 881 } /* for (i) */ 882 883 release_zoid_reply_queue(queue_id); 724 725 BMI_zoid_server_memfree(((struct ZoidServerMethodData*)op-> 726 method_data)->tmp_buffer); 727 } 728 729 bmi_dealloc_method_op(op); 730 } /* for (i) */ 731 } /* if (resp.count > 0) */ 732 733 release_zoid_socket(zoid_release); 884 734 885 735 return 0; … … 890 740 BMI_zoid_server_cancel(bmi_op_id_t id, bmi_context_id context_id) 891 741 { 742 int zoid_fd, zoid_release; 743 int hdr; 892 744 struct ZBMIControlCancelCmd cmd; 893 745 method_op_p op; … … 899 751 have not, most likely because of a lack of memory (those can be handled 900 752 locally). */ 901 if (! METHOD_DATA(op)->zoid_buf_id)902 { 903 gen_mutex_lock(&no_mem_queue_mutex);753 if (!((struct ZoidServerMethodData*)op->method_data)->zoid_buf_id) 754 { 755 pthread_mutex_lock(&no_mem_queue_mutex); 904 756 905 757 /* Test again, now with mutex properly locked. */ 906 if (! METHOD_DATA(op)->zoid_buf_id)758 if (!((struct ZoidServerMethodData*)op->method_data)->zoid_buf_id) 907 759 { 908 760 if (!op->error_code) 909 761 { 910 762 /* It must be an out-of-memory case on no_mem_queue. */ 911 struct no_mem_descriptor* desc;763 struct NoMemDescriptor* desc; 912 764 913 765 op->error_code = BMI_ECANCEL; … … 929 781 assert(desc); 930 782 931 gen_mutex_lock(&error_ops_mutex);783 pthread_mutex_lock(&error_ops_mutex); 932 784 op_list_add(error_ops, op); 933 gen_mutex_unlock(&error_ops_mutex);934 } 935 936 gen_mutex_unlock(&no_mem_queue_mutex);785 pthread_mutex_unlock(&error_ops_mutex); 786 } 787 788 pthread_mutex_unlock(&no_mem_queue_mutex); 937 789 938 790 return 0; 939 791 } 940 792 941 gen_mutex_unlock(&no_mem_queue_mutex); 942 } 943 944 cmd.command_id = ZBMI_CONTROL_CANCEL; 945 cmd.zoid_id = METHOD_DATA(op)->zoid_buf_id; 946 947 if (mq_send(zbmi_control_queue, (void*)&cmd, sizeof(cmd), 0) < 0) 948 { 949 perror("mq_send"); 793 pthread_mutex_unlock(&no_mem_queue_mutex); 794 } 795 796 if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 797 return zoid_fd; 798 799 hdr = ZBMI_CONTROL_CANCEL; 800 cmd.zoid_id = ((struct ZoidServerMethodData*)op->method_data)-> 801 zoid_buf_id; 802 803 if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr) || 804 socket_write(zoid_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) 805 { 806 perror("write"); 807 release_zoid_socket(zoid_release); 950 808 return -BMI_EINVAL; 951 809 } … … 954 812 } 955 813 956 /* An internal routine used to obtain a (reply) queue to the ZBMI plugin. */ 957 static mqd_t 958 get_zoid_reply_queue(int* queue_id) 814 /* 815 * A more robust version of read(2). 816 */ 817 static ssize_t 818 socket_read(int fd, void* buf, size_t count) 819 { 820 size_t already_read = 0; 821 822 while (already_read < count) 823 { 824 ssize_t n; 825 826 n = read(fd, buf + already_read, count - already_read); 827 828 if (n == -1) 829 { 830 if (errno == EINTR || errno == EAGAIN) 831 continue; 832 return -1; 833 } 834 else if (n == 0) 835 return already_read; 836 else 837 already_read += n; 838 } 839 840 return already_read; 841 } 842 843 /* 844 * A more robust version of write(2). 845 */ 846 static ssize_t 847 socket_write(int fd, const void* buf, size_t count) 848 { 849 size_t already_written = 0; 850 851 while (already_written < count) 852 { 853 ssize_t n; 854 855 n = write(fd, buf + already_written, count - already_written); 856 857 if (n == -1) 858 { 859 if (errno == EINTR || errno == EAGAIN) 860 continue; 861 return -1; 862 } 863 else 864 already_written += n; 865 } 866 867 return already_written; 868 } 869 870 /* An internal routine used to obtain a socket to the ZBMI plugin. */ 871 static int 872 get_zoid_socket(int* release_token) 959 873 { 960 874 int i; 961 875 962 gen_mutex_lock(&zbmi_queues_mutex);963 964 for (i = 0; i < zbmi_ queues_used; i++)965 if (!zbmi_ queues_inuse[i])876 pthread_mutex_lock(&zbmi_sockets_mutex); 877 878 for (i = 0; i < zbmi_sockets_used; i++) 879 if (!zbmi_sockets_inuse[i]) 966 880 break; 967 881 968 if (i == zbmi_queues_used) 969 { 970 char queue_name[256]; 971 struct mq_attr attr; 972 973 /* All open queues are currently in use. Open a new one. */ 974 if (zbmi_queues_used == zbmi_queues_len) 882 if (i == zbmi_sockets_used) 883 { 884 /* All open sockets are currently in use. Open a new one. */ 885 struct sockaddr_un addr; 886 887 if (zbmi_sockets_used == zbmi_sockets_len) 975 888 { 976 889 /* Enlarge the arrays first. */ 977 890 int j; 978 int* zbmi_ queues_new;979 char* zbmi_ queues_inuse_new;980 981 if (zbmi_ queues_len == 0)982 zbmi_ queues_len = ZBMI_QUEUES_LEN_INIT;891 int* zbmi_sockets_new; 892 char* zbmi_sockets_inuse_new; 893 894 if (zbmi_sockets_len == 0) 895 zbmi_sockets_len = ZBMI_SOCKETS_LEN_INIT; 983 896 else 984 zbmi_ queues_len *= 2;985 zbmi_ queues_new = realloc(zbmi_queues, zbmi_queues_len *986 sizeof(*zbmi_queues));987 if (!zbmi_ queues_new)988 { 989 gen_mutex_unlock(&zbmi_queues_mutex);897 zbmi_sockets_len *= 2; 898 zbmi_sockets_new = realloc(zbmi_sockets, zbmi_sockets_len * 899 sizeof(*zbmi_sockets)); 900 if (!zbmi_sockets_new) 901 { 902 pthread_mutex_unlock(&zbmi_sockets_mutex); 990 903 return -BMI_ENOMEM; 991 904 } 992 zbmi_ queues = zbmi_queues_new;993 zbmi_ queues_inuse_new = realloc(zbmi_queues_inuse,994 zbmi_queues_len *995 sizeof(*zbmi_queues_inuse));996 if (!zbmi_ queues_inuse_new)997 { 998 gen_mutex_unlock(&zbmi_queues_mutex);905 zbmi_sockets = zbmi_sockets_new; 906 zbmi_sockets_inuse_new = realloc(zbmi_sockets_inuse, 907 zbmi_sockets_len * 908 sizeof(*zbmi_sockets_inuse)); 909 if (!zbmi_sockets_inuse_new) 910 { 911 pthread_mutex_unlock(&zbmi_sockets_mutex); 999 912 return -BMI_ENOMEM; 1000 913 } 1001 zbmi_queues_inuse = zbmi_queues_inuse_new; 1002 1003 for (j = zbmi_queues_used; j < zbmi_queues_len; j++) 1004 zbmi_queues_inuse[j] = 0; 1005 } 1006 1007 sprintf(queue_name, ZBMI_REPLY_QUEUE_TEMPLATE, i); 1008 attr.mq_flags = 0; 1009 attr.mq_maxmsg = 2; /* We never put more than one in there. */ 1010 attr.mq_msgsize = ZBMI_MAX_MSG_SIZE; 1011 attr.mq_curmsgs = 0; 1012 1013 zbmi_queues[i] = mq_open(queue_name, O_RDONLY | O_CREAT | O_EXCL, 1014 0666, &attr); 1015 if (zbmi_queues[i] < 0) 1016 { 1017 if (errno == EEXIST) 1018 { 1019 /* There's no such thing as O_TRUNC for message queues, so 1020 we need to do it manually. */ 1021 mq_unlink(queue_name); 1022 zbmi_queues[i] = mq_open(queue_name, 1023 O_RDONLY | O_CREAT | O_EXCL, 1024 0666, &attr); 1025 } 1026 } 1027 1028 if (zbmi_queues[i] < 0) 1029 { 1030 perror("ZBMI reply queue"); 1031 gen_mutex_unlock(&zbmi_queues_mutex); 914 zbmi_sockets_inuse = zbmi_sockets_inuse_new; 915 916 for (j = zbmi_sockets_used; j < zbmi_sockets_len; j++) 917 zbmi_sockets_inuse[j] = 0; 918 } 919 920 if ((zbmi_sockets[i] = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) 921 { 922 perror("ZBMI control socket"); 923 pthread_mutex_unlock(&zbmi_sockets_mutex); 1032 924 return -BMI_EINVAL; 1033 925 } 1034 926 1035 zbmi_queues_used++; 1036 } 1037 1038 zbmi_queues_inuse[i] = 1; 1039 1040 gen_mutex_unlock(&zbmi_queues_mutex); 1041 1042 *queue_id = i; 1043 1044 return zbmi_queues[i]; 1045 } 1046 1047 /* Releases the queue obtained with get_zoid_reply_queue. */ 927 addr.sun_family = AF_UNIX; 928 strcpy(addr.sun_path, ZBMI_SOCKET_NAME); 929 #if 0 930 if (bind(zbmi_sockets[i], (struct sockaddr*)&addr, sizeof(addr)) < 0) 931 { 932 perror("bind " ZBMI_SOCKET_NAME); 933 close(zbmi_sockets[i]); 934 pthread_mutex_unlock(&zbmi_sockets_mutex); 935 return -BMI_EINVAL; 936 } 937 #endif 938 while (connect(zbmi_sockets[i], (struct sockaddr*)&addr, sizeof(addr)) 939 < 0) 940 { 941 if (errno == ENOENT || errno == ECONNREFUSED) 942 { 943 /* ZOID server not running yet or too many requests? Wait 944 a little... */ 945 sleep(1); 946 } 947 else 948 { 949 perror("connect to ZOID"); 950 close(zbmi_sockets[i]); 951 pthread_mutex_unlock(&zbmi_sockets_mutex); 952 return -BMI_EINVAL; 953 } 954 } 955 956 zbmi_sockets_used++; 957 } 958 959 zbmi_sockets_inuse[i] = 1; 960 961 pthread_mutex_unlock(&zbmi_sockets_mutex); 962 963 *release_token = i; 964 965 return zbmi_sockets[i]; 966 } 967 968 /* Releases the socket obtained with get_zoid_socket. */ 1048 969 static void 1049 release_zoid_ reply_queue(int queue_id)1050 { 1051 assert( queue_id >= 0 && queue_id < zbmi_queues_used);1052 1053 gen_mutex_lock(&zbmi_queues_mutex);1054 1055 assert(zbmi_ queues_inuse[queue_id]);1056 zbmi_ queues_inuse[queue_id] = 0;1057 1058 gen_mutex_unlock(&zbmi_queues_mutex);970 release_zoid_socket(int release_token) 971 { 972 assert(release_token >= 0 && release_token < zbmi_sockets_used); 973 974 pthread_mutex_lock(&zbmi_sockets_mutex); 975 976 assert(zbmi_sockets_inuse[release_token]); 977 zbmi_sockets_inuse[release_token] = 0; 978 979 pthread_mutex_unlock(&zbmi_sockets_mutex); 1059 980 } 1060 981 … … 1068 989 assert(zoid_addr >= 0); 1069 990 1070 gen_mutex_lock(&clients_mutex);991 pthread_mutex_lock(&clients_mutex); 1071 992 1072 993 if (zoid_addr >= clients_len) … … 1081 1002 sizeof(*clients_addr)))) 1082 1003 { 1083 gen_mutex_unlock(&clients_mutex);1004 pthread_mutex_unlock(&clients_mutex); 1084 1005 return NULL; 1085 1006 } … … 1105 1026 ret = clients_addr[zoid_addr]; 1106 1027 1107 gen_mutex_unlock(&clients_mutex);1028 pthread_mutex_unlock(&clients_mutex); 1108 1029 1109 1030 return ret; … … 1114 1035 zoid_server_free_client_addr(bmi_method_addr_p addr) 1115 1036 { 1116 gen_mutex_lock(&clients_mutex);1037 pthread_mutex_lock(&clients_mutex); 1117 1038 1118 1039 assert(((struct zoid_addr*)addr->method_data)->pid < clients_len && … … 1120 1041 clients_addr[((struct zoid_addr*)addr->method_data)->pid] = NULL; 1121 1042 1122 gen_mutex_unlock(&clients_mutex);1043 pthread_mutex_unlock(&clients_mutex); 1123 1044 } 1124 1045 … … 1127 1048 enqueue_no_mem(method_op_p op, bmi_size_t total_size) 1128 1049 { 1129 struct no_mem_descriptor *nomemdesc, *desc;1050 struct NoMemDescriptor *nomemdesc, *desc; 1130 1051 1131 1052 if (!(nomemdesc = malloc(sizeof(*nomemdesc)))) … … 1137 1058 /* no_mem_queue is sorted in descending size order. 1138 1059 Look for an appropriate spot to insert a new entry. */ 1139 gen_mutex_lock(&no_mem_queue_mutex);1060 pthread_mutex_lock(&no_mem_queue_mutex); 1140 1061 1141 1062 for (desc = no_mem_queue_first; desc; desc = desc->next) … … 1179 1100 } 1180 1101 1181 gen_mutex_unlock(&no_mem_queue_mutex);1102 pthread_mutex_unlock(&no_mem_queue_mutex); 1182 1103 1183 1104 return 0; 1184 1105 } 1185 1106 1186 /* A common internal posting routine for send and receive requests. 1187 "not_immediate" is used for messages triggered from memfree; it is 1188 inconvenient at that point for messages to succeed immediately (we 1189 would need a separate queue for them). 1190 The function returns 0 if posted successfully, 1 for immediate 1191 completion, and a negative value if failed. 1192 For immediate completions of receives, the length of the received 1193 message is stored in *length; 1194 */ 1107 /* A common internal posting routine for send and receive requests. */ 1195 1108 static int 1196 send_post_cmd(method_op_p op, int not_immediate, int* length) 1197 { 1198 mqd_t reply_queue; 1199 int queue_id; 1109 send_post_cmd(method_op_p op) 1110 { 1111 int zoid_fd, zoid_release; 1200 1112 int cmd_len, i; 1201 char cmd_buffer[ZBMI_MAX_MSG_SIZE];1113 int hdr; 1202 1114 struct ZBMIControlPostCmd* cmd; 1203 1115 struct ZBMIControlPostResp resp; 1204 1116 int list_count_zoid; 1205 1117 1206 if ((reply_queue = get_zoid_reply_queue(&queue_id)) < 0) 1207 return reply_queue; 1208 1209 list_count_zoid = (METHOD_DATA(op)->tmp_buffer ? 1 : op->list_count); 1118 if ((zoid_fd = get_zoid_socket(&zoid_release)) < 0) 1119 return zoid_fd; 1120 1121 hdr = (op->send_recv == BMI_SEND ? ZBMI_CONTROL_POST_SEND : 1122 ZBMI_CONTROL_POST_RECV); 1123 list_count_zoid = (((struct ZoidServerMethodData*)op->method_data)-> 1124 tmp_buffer ? 1 : op->list_count); 1210 1125 cmd_len = offsetof(typeof(*cmd), buf.list) + 1211 1126 list_count_zoid * sizeof(cmd->buf.list[0]); 1212 if (cmd_len > ZBMI_MAX_MSG_SIZE) 1213 return -BMI_EINVAL; 1214 1215 cmd = (struct ZBMIControlPostCmd*)cmd_buffer; 1216 cmd->command_id = (op->send_recv == BMI_SEND ? ZBMI_CONTROL_POST_SEND : 1217 ZBMI_CONTROL_POST_RECV); 1218 cmd->queue_id = queue_id; 1219 cmd->not_immediate = not_immediate; 1127 cmd = alloca(cmd_len); 1220 1128 cmd->bmi_id = op->op_id; 1221 1129 cmd->buf.addr = ((struct zoid_addr*)op->addr->method_data)->pid; 1222 1130 cmd->buf.tag = op->msg_tag; 1223 1131 cmd->buf.list_count = list_count_zoid; 1224 if ( METHOD_DATA(op)->tmp_buffer)1225 { 1226 cmd->buf.list[0].buffer = ( char*)METHOD_DATA(op)->tmp_buffer -1227 zbmi_shm_rzv;1132 if (((struct ZoidServerMethodData*)op->method_data)->tmp_buffer) 1133 { 1134 cmd->buf.list[0].buffer = ((struct ZoidServerMethodData*)op-> 1135 method_data)->tmp_buffer - zbmi_shm_exp; 1228 1136 cmd->buf.list[0].size = (op->send_recv == BMI_SEND ? op->actual_size : 1229 1137 op->expected_size); … … 1232 1140 for (i = 0; i < op->list_count; i++) 1233 1141 { 1234 cmd->buf.list[i].buffer = (char*)op->buffer_list[i] - zbmi_shm_rzv;1142 cmd->buf.list[i].buffer = op->buffer_list[i] - zbmi_shm_exp; 1235 1143 cmd->buf.list[i].size = op->size_list[i]; 1236 1144 } 1237 1145 1238 /* The moment the command is written below, we can get a completion of 1239 this request on another thread that is waiting in testcontext. That 1240 thread will release method_op_p, resulting in us accessing free memory. 1241 This mutex prevents that. */ 1242 gen_mutex_init(&METHOD_DATA(op)->post_mutex); 1243 gen_mutex_lock(&METHOD_DATA(op)->post_mutex); 1244 1245 if (mq_send(zbmi_control_queue, (void*)cmd, cmd_len, 0) < 0) 1246 { 1247 perror("mq_send"); 1248 gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 1249 release_zoid_reply_queue(queue_id); 1146 if (socket_write(zoid_fd, &hdr, sizeof(hdr)) != sizeof(hdr) || 1147 socket_write(zoid_fd, cmd, cmd_len) != cmd_len) 1148 { 1149 perror("write"); 1150 release_zoid_socket(zoid_release); 1250 1151 return -BMI_EINVAL; 1251 1152 } 1252 1153 1253 if (mq_receive(reply_queue, (void*)&resp, ZBMI_MAX_MSG_SIZE, NULL) != 1254 sizeof(resp)) 1255 { 1256 perror("mq_receive"); 1257 gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 1258 release_zoid_reply_queue(queue_id); 1154 if (socket_read(zoid_fd, &resp, sizeof(resp)) != sizeof(resp)) 1155 { 1156 perror("read"); 1157 release_zoid_socket(zoid_release); 1259 1158 return -BMI_EINVAL; 1260 1159 } 1261 1160 1262 release_zoid_ reply_queue(queue_id);1161 release_zoid_socket(zoid_release); 1263 1162 1264 1163 if (!resp.zoid_id) 1265 {1266 gen_mutex_unlock(&METHOD_DATA(op)->post_mutex);1267 1164 return -BMI_ENOMEM; 1268 } 1269 else if (resp.zoid_id == -1) 1270 { 1271 /* Immediate completion. */ 1272 assert(!not_immediate); 1273 gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 1274 if (length) 1275 *length = resp.length; 1276 return 1; 1277 } 1278 1279 METHOD_DATA(op)->zoid_buf_id = resp.zoid_id; 1280 1281 gen_mutex_unlock(&METHOD_DATA(op)->post_mutex); 1165 1166 ((struct ZoidServerMethodData*)op->method_data)->zoid_buf_id = resp.zoid_id; 1282 1167 1283 1168 return 0; -
branches/cu-security-branch/src/io/bmi/bmi_zoid/zoid.c
r8359 r8397 42 42 static op_list_p zoid_ops; 43 43 44 /* As a special feature, client-side ZOID method allows post timeouts.45 If non-zero this enables combined post & test in order to reduce the46 number of round-trip messages to the server. */47 static int post_timeout = 0;48 49 44 50 45 static int zoid_err_to_bmi(int err); … … 56 51 const bmi_size_t* size_list, int list_count, 57 52 bmi_size_t total_size, enum bmi_buffer_type buffer_type, 58 bmi_msg_tag_t tag, uint8_t class,void* user_ptr,53 bmi_msg_tag_t tag, void* user_ptr, 59 54 bmi_context_id context_id, PVFS_hint hints, 60 55 int unexpected) … … 100 95 } 101 96 102 if (total_size <= ZOID_MAX_EAGER_MSG) 103 ret = zbmi_send_eager((const void**)buffer_list, size_list_cp, 104 list_count, tag, class, unexpected, 105 post_timeout); 106 else 107 ret = zbmi_send(buffer_list, size_list_cp, list_count, tag, class, 108 unexpected, post_timeout); 109 err = __zoid_error(); 110 if (total_size <= ZOID_MAX_EAGER_MSG ? 111 (!err && ret == 0) : (err == ENOMEM)) 97 ret = zbmi_send(buffer_list, size_list_cp, list_count, tag, unexpected); 98 if ((err = __zoid_error())) 112 99 { 113 /* Indicates that there was no memory on the server side 114 for the message. Could happen if this is an expected 115 message and no matching receive has been posted, or if 116 we sent too many unexpected messages without the server-side 117 receiving anything. */ 118 119 if (!(new_op = bmi_alloc_method_op(0))) 120 return -BMI_ENOMEM; 121 *id = new_op->op_id; 122 new_op->addr = dest; 123 new_op->send_recv = BMI_SEND; 124 new_op->user_ptr = user_ptr; 125 new_op->msg_tag = tag; 126 new_op->class = class; 127 new_op->list_count = list_count; 128 new_op->actual_size = total_size; 129 if (list_count == 1) 100 if (err == ENOMEM) 130 101 { 131 /* Our buffer_list and size_list pointers might be 132 temporary (see, e.g., BMI_zoid_post_send), so we 133 prefer to copy the data over to someplace more 134 permanent. */ 135 new_op->buffer = (void*)buffer_list[0]; 136 new_op->buffer_list = &new_op->buffer; 137 new_op->size_list = &new_op->actual_size; 102 /* Indicates that there was no memory on the server side 103 for the message. Could happen if this is an expected 104 message and no matching receive has been posted, or if 105 we sent too many unexpected messages without the server-side 106 receiving anything. */ 107 108 if (!(new_op = bmi_alloc_method_op(0))) 109 return -BMI_ENOMEM; 110 *id = new_op->op_id; 111 new_op->addr = dest; 112 new_op->send_recv = BMI_SEND; 113 new_op->user_ptr = user_ptr; 114 new_op->msg_tag = tag; 115 new_op->list_count = list_count; 116 new_op->actual_size = total_size; 117 if (list_count == 1) 118 { 119 /* Our buffer_list and size_list pointers might be 120 temporary (see, e.g., BMI_zoid_post_send), so we 121 prefer to copy the data over to someplace more 122 permanent. */ 123 new_op->buffer = (void*)buffer_list[0]; 124 new_op->buffer_list = &new_op->buffer; 125 new_op->size_list = &new_op->actual_size; 126 } 127 else 128 { 129 new_op->buffer_list = (void*const*)buffer_list; 130 new_op->size_list = size_list; 131 } 132 new_op->error_code = 0; 133 new_op->method_data = (void*)unexpected; 134 135 op_list_add(zoid_ops, new_op); 136 137 return 0; /* Non-immediate completion. */ 138 138 } 139 139 else 140 { 141 new_op->buffer_list = (void*const*)buffer_list; 142 new_op->size_list = size_list; 143 } 144 new_op->error_code = 0; 145 new_op->method_data = (void*)unexpected; 146 147 op_list_add(zoid_ops, new_op); 148 149 return 0; /* Non-immediate completion. */ 140 return zoid_err_to_bmi(err); 150 141 } 151 else if (err) 152 return zoid_err_to_bmi(err);142 143 assert (ret == 1); 153 144 154 145 return 1; /* Immediate completion. */ … … 201 192 } 202 193 203 ret = zbmi_recv(buffer_list, size_list_cp, list_count, tag, 204 post_timeout); 194 ret = zbmi_recv(buffer_list, size_list_cp, list_count, tag); 205 195 if ((err = __zoid_error())) 206 196 return zoid_err_to_bmi(err); … … 231 221 new_op->user_ptr = user_ptr; 232 222 new_op->msg_tag = tag; 233 new_op->class = 0;234 223 new_op->list_count = list_count; 235 224 new_op->expected_size = total_expected_size; … … 307 296 incount_fwd++; 308 297 } 309 310 /* If we have canceled messages, then we don't need to wait. We311 still check with the server to find about ready messages, but312 that's it. */313 if (canceled > 0)314 max_idle_time_ms = 0;315 298 316 299 ret = zbmi_test(tags, ops, unexp_sizes, incount_fwd, ready, … … 357 340 ret = zbmi_send((const void*const*)op->buffer_list, 358 341 size_list_cp, op->list_count, op->msg_tag, 359 op->class, (int)op->method_data, 0);342 (int)op->method_data); 360 343 if ((err = __zoid_error())) 361 344 { … … 384 367 385 368 ret = zbmi_recv(op->buffer_list, size_list_cp, 386 op->list_count, op->msg_tag , 0);369 op->list_count, op->msg_tag); 387 370 if ((err = __zoid_error())) 388 371 return zoid_err_to_bmi(err); … … 500 483 bmi_dealloc_method_addr(inout_parameter); 501 484 break; 502 503 case BMI_ZOID_POST_TIMEOUT:504 if (zoid_node_type == CLIENT)505 post_timeout = *(int*)inout_parameter;506 break;507 485 } 508 486 … … 573 551 else 574 552 return BMI_zoid_server_unexpected_free(buffer); 553 } 554 555 /* Invoked on BMI_post_send. */ 556 static int 557 BMI_zoid_post_send(bmi_op_id_t* id, bmi_method_addr_p dest, 558 const void* buffer, bmi_size_t size, 559 enum bmi_buffer_type buffer_type, bmi_msg_tag_t tag, 560 void* user_ptr, bmi_context_id context_id, PVFS_hint hints) 561 { 562 return zoid_post_send_common(id, dest, &buffer, &size, 1, size, buffer_type, 563 tag, user_ptr, context_id, hints, 0); 564 } 565 566 /* Invoked on BMI_post_sendunexpected. We only support it on clients. */ 567 static int 568 BMI_zoid_post_sendunexpected(bmi_op_id_t* id, bmi_method_addr_p dest, 569 const void* buffer, bmi_size_t size, 570 enum bmi_buffer_type buffer_type, 571 bmi_msg_tag_t tag, void* user_ptr, 572 bmi_context_id context_id, PVFS_hint hints) 573 { 574 return zoid_post_send_common(id, dest, &buffer, &size, 1, size, buffer_type, 575 tag, user_ptr, context_id, hints, 1); 576 } 577 578 /* Invoked on BMI_post_recv. */ 579 static int 580 BMI_zoid_post_recv(bmi_op_id_t* id, bmi_method_addr_p src, void* buffer, 581 bmi_size_t expected_size, bmi_size_t* actual_size, 582 enum bmi_buffer_type buffer_type, bmi_msg_tag_t tag, 583 void* user_ptr, bmi_context_id context_id, PVFS_hint hints) 584 { 585 return zoid_post_recv_common(id, src, &buffer, &expected_size, 1, 586 expected_size, actual_size, buffer_type, tag, 587 user_ptr, context_id, hints); 575 588 } 576 589 … … 645 658 } 646 659 647 /* Invoked on BMI_testunexpected _class. We only support iton the server. */660 /* Invoked on BMI_testunexpected. We only support in on the server. */ 648 661 static int 649 662 BMI_zoid_testunexpected(int incount, int* outcount, 650 663 struct bmi_method_unexpected_info* info, 651 uint8_t class,int max_idle_time_ms)664 int max_idle_time_ms) 652 665 { 653 666 if (zoid_node_type == CLIENT) … … 656 669 /* Server code. */ 657 670 658 return BMI_zoid_server_testunexpected(incount, outcount, info, class,671 return BMI_zoid_server_testunexpected(incount, outcount, info, 659 672 max_idle_time_ms); 660 673 } … … 696 709 { 697 710 return zoid_post_send_common(id, dest, buffer_list, size_list, list_count, 698 total_size, buffer_type, tag, 0,user_ptr,711 total_size, buffer_type, tag, user_ptr, 699 712 context_id, hints, 0); 700 713 } … … 715 728 } 716 729 717 /* Invoked on BMI_post_sendunexpected_list_class. We only support it on 718 clients. */ 730 /* Invoked on BMI_post_sendunexpected_list. We only support it on clients. */ 719 731 static int 720 732 BMI_zoid_post_sendunexpected_list(bmi_op_id_t* id, bmi_method_addr_p dest, … … 723 735 bmi_size_t total_size, 724 736 enum bmi_buffer_type buffer_type, 725 bmi_msg_tag_t tag, uint8_t class, 726 void* user_ptr, bmi_context_id context_id, 727 PVFS_hint hints) 737 bmi_msg_tag_t tag, void* user_ptr, 738 bmi_context_id context_id, PVFS_hint hints) 728 739 { 729 740 return zoid_post_send_common(id, dest, buffer_list, size_list, list_count, 730 total_size, buffer_type, tag, class,user_ptr,741 total_size, buffer_type, tag, user_ptr, 731 742 context_id, hints, 1); 732 743 } … … 793 804 .unexpected_free = BMI_zoid_unexpected_free, 794 805 806 .post_send = BMI_zoid_post_send, 807 .post_sendunexpected = BMI_zoid_post_sendunexpected, 808 .post_recv = BMI_zoid_post_recv, 809 795 810 .test = BMI_zoid_test, 796 811 .testsome = BMI_zoid_testsome, -
branches/cu-security-branch/src/io/bmi/bmi_zoid/zoid.h
r8359 r8397 4 4 #define ZOID_MAX_EXPECTED_MSG (128 * 1024 * 1024) 5 5 #define ZOID_MAX_UNEXPECTED_MSG 8192 6 #define ZOID_MAX_EAGER_MSG 10247 8 6 9 7 #define ZOID_ADDR_SERVER_PID -1 … … 27 25 int BMI_zoid_server_testunexpected(int incount, int* outcount, 28 26 struct bmi_method_unexpected_info* info, 29 uint8_t class,int max_idle_time_ms);27 int max_idle_time_ms); 30 28 int zoid_server_send_common(bmi_op_id_t* id, bmi_method_addr_p dest, 31 29 const void*const* buffer_list,
