| 1 | /* |
|---|
| 2 | * (C) 2003 Clemson University and The University of Chicago |
|---|
| 3 | * |
|---|
| 4 | * See COPYING in top-level directory. |
|---|
| 5 | */ |
|---|
| 6 | |
|---|
| 7 | /* pvfs2_msgpairarray_sm |
|---|
| 8 | * |
|---|
| 9 | * The purpose of this state machine is to prepare, send, and |
|---|
| 10 | * receive a collection of request/response pairs (msgpairs). |
|---|
| 11 | */ |
|---|
| 12 | |
|---|
| 13 | #include <string.h> |
|---|
| 14 | #include <assert.h> |
|---|
| 15 | |
|---|
| 16 | #include "msgpairarray.h" |
|---|
| 17 | #include "pvfs2-debug.h" |
|---|
| 18 | #include "pint-cached-config.h" |
|---|
| 19 | #include "job.h" |
|---|
| 20 | #include "gossip.h" |
|---|
| 21 | #include "PINT-reqproto-encode.h" |
|---|
| 22 | #include "pvfs2-util.h" |
|---|
| 23 | #include "pint-util.h" |
|---|
| 24 | #include "server-config-mgr.h" |
|---|
| 25 | #include "pvfs2-internal.h" |
|---|
| 26 | #include "state-machine.h" |
|---|
| 27 | |
|---|
| 28 | #define gossip_err_unless_quiet(format, f...) \ |
|---|
| 29 | do {\ |
|---|
| 30 | if(mop->params.quiet_flag)\ |
|---|
| 31 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, format, ##f); \ |
|---|
| 32 | else \ |
|---|
| 33 | gossip_err(format, ##f); \ |
|---|
| 34 | } while(0) |
|---|
| 35 | |
|---|
| 36 | enum |
|---|
| 37 | { |
|---|
| 38 | MSGPAIRS_COMPLETE = 190, |
|---|
| 39 | MSGPAIRS_RETRY = 191, |
|---|
| 40 | MSGPAIRS_RETRY_NODELAY = 192 |
|---|
| 41 | }; |
|---|
| 42 | |
|---|
| 43 | %% |
|---|
| 44 | |
|---|
| 45 | nested machine pvfs2_msgpairarray_sm |
|---|
| 46 | { |
|---|
| 47 | state init |
|---|
| 48 | { |
|---|
| 49 | run msgpairarray_init; |
|---|
| 50 | default => post; |
|---|
| 51 | } |
|---|
| 52 | |
|---|
| 53 | state post |
|---|
| 54 | { |
|---|
| 55 | run msgpairarray_post; |
|---|
| 56 | MSGPAIRS_COMPLETE => completion_fn; |
|---|
| 57 | default => complete; |
|---|
| 58 | } |
|---|
| 59 | |
|---|
| 60 | state post_retry |
|---|
| 61 | { |
|---|
| 62 | run msgpairarray_post_retry; |
|---|
| 63 | default => post; |
|---|
| 64 | } |
|---|
| 65 | |
|---|
| 66 | state complete |
|---|
| 67 | { |
|---|
| 68 | run msgpairarray_complete; |
|---|
| 69 | MSGPAIRS_COMPLETE => completion_fn; |
|---|
| 70 | default => complete; |
|---|
| 71 | } |
|---|
| 72 | |
|---|
| 73 | state completion_fn |
|---|
| 74 | { |
|---|
| 75 | run msgpairarray_completion_fn; |
|---|
| 76 | MSGPAIRS_RETRY => post_retry; |
|---|
| 77 | MSGPAIRS_RETRY_NODELAY => post; |
|---|
| 78 | default => done; |
|---|
| 79 | } |
|---|
| 80 | |
|---|
| 81 | state done |
|---|
| 82 | { |
|---|
| 83 | run msgpairarray_done; |
|---|
| 84 | default => return; |
|---|
| 85 | } |
|---|
| 86 | } |
|---|
| 87 | |
|---|
| 88 | %% |
|---|
| 89 | |
|---|
| 90 | static PINT_sm_action msgpairarray_init( |
|---|
| 91 | struct PINT_smcb *smcb, job_status_s *js_p) |
|---|
| 92 | { |
|---|
| 93 | PINT_sm_msgarray_op *mop = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); |
|---|
| 94 | int i = 0; |
|---|
| 95 | PINT_sm_msgpair_state *msg_p = NULL; |
|---|
| 96 | |
|---|
| 97 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, "(%p) msgpairarray state: init " |
|---|
| 98 | "(%d msgpair(s))\n", smcb, mop->count); |
|---|
| 99 | |
|---|
| 100 | assert(mop->count > 0); |
|---|
| 101 | |
|---|
| 102 | js_p->error_code = 0; |
|---|
| 103 | |
|---|
| 104 | /* set number of operations that must complete. */ |
|---|
| 105 | mop->params.comp_ct = (2 * mop->count); |
|---|
| 106 | |
|---|
| 107 | for(i = 0; i < mop->count; i++) |
|---|
| 108 | { |
|---|
| 109 | msg_p = &mop->msgarray[i]; |
|---|
| 110 | assert(msg_p); |
|---|
| 111 | |
|---|
| 112 | assert((msg_p->retry_flag == PVFS_MSGPAIR_RETRY) || |
|---|
| 113 | (msg_p->retry_flag == PVFS_MSGPAIR_NO_RETRY)); |
|---|
| 114 | |
|---|
| 115 | msg_p->encoded_resp_p = NULL; |
|---|
| 116 | msg_p->retry_count = 0; |
|---|
| 117 | msg_p->complete = 0; |
|---|
| 118 | } |
|---|
| 119 | return SM_ACTION_COMPLETE; |
|---|
| 120 | } |
|---|
| 121 | |
|---|
| 122 | /* msgpairarray_post() |
|---|
| 123 | * |
|---|
| 124 | * The following elements of the PINT_sm_msgpair_state |
|---|
| 125 | * should be valid prior to this state (for each msgpair in array): |
|---|
| 126 | * - req (unencoded request) |
|---|
| 127 | * - srv_addr of each element in msg array |
|---|
| 128 | * |
|---|
| 129 | * This state performs the following operations for each msgpair, |
|---|
| 130 | * one at a time: |
|---|
| 131 | * (1) encodes request |
|---|
| 132 | * (2) calculates maximum response size |
|---|
| 133 | * (3) allocates BMI memory for response data (encoded) |
|---|
| 134 | * (4) gets a session tag for the pair of messages |
|---|
| 135 | * (5) posts the receive of the response |
|---|
| 136 | * (6) posts the send of the request |
|---|
| 137 | * (7) stores job ids for later matching |
|---|
| 138 | * |
|---|
| 139 | */ |
|---|
| 140 | static PINT_sm_action msgpairarray_post( |
|---|
| 141 | struct PINT_smcb *smcb, job_status_s *js_p) |
|---|
| 142 | { |
|---|
| 143 | PINT_sm_msgarray_op *mop = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); |
|---|
| 144 | int ret = -PVFS_EINVAL, i = 0, tmp = 0; |
|---|
| 145 | struct server_configuration_s *server_config = NULL; |
|---|
| 146 | PVFS_msg_tag_t session_tag; |
|---|
| 147 | PINT_sm_msgpair_state *msg_p = NULL; |
|---|
| 148 | struct filesystem_configuration_s *cur_fs = NULL; |
|---|
| 149 | int must_loop_encodings = 0; |
|---|
| 150 | int local_enc_and_alloc = 0; |
|---|
| 151 | |
|---|
| 152 | gossip_debug( |
|---|
| 153 | GOSSIP_MSGPAIR_DEBUG, "%s: sm %p " |
|---|
| 154 | "%d total message(s) with %d incomplete\n", __func__, smcb, |
|---|
| 155 | mop->count * 2, mop->params.comp_ct); |
|---|
| 156 | |
|---|
| 157 | js_p->error_code = 0; |
|---|
| 158 | |
|---|
| 159 | assert(mop->count > 0); |
|---|
| 160 | assert(mop->params.comp_ct >= 2); |
|---|
| 161 | |
|---|
| 162 | for (i = 0; i < mop->count; i++) |
|---|
| 163 | { |
|---|
| 164 | msg_p = &mop->msgarray[i]; |
|---|
| 165 | assert(msg_p); |
|---|
| 166 | |
|---|
| 167 | /* |
|---|
| 168 | here we skip over the msgs that have already completed in |
|---|
| 169 | the case of being in the retry code path when it's ok |
|---|
| 170 | */ |
|---|
| 171 | if (msg_p->complete) |
|---|
| 172 | { |
|---|
| 173 | continue; |
|---|
| 174 | } |
|---|
| 175 | |
|---|
| 176 | msg_p->op_status = 0; |
|---|
| 177 | |
|---|
| 178 | if (msg_p->encoded_resp_p == NULL) |
|---|
| 179 | { |
|---|
| 180 | if (msg_p->fs_id != PVFS_FS_ID_NULL) |
|---|
| 181 | { |
|---|
| 182 | server_config = PINT_server_config_mgr_get_config( |
|---|
| 183 | msg_p->fs_id); |
|---|
| 184 | assert(server_config); |
|---|
| 185 | |
|---|
| 186 | cur_fs = PINT_config_find_fs_id( |
|---|
| 187 | server_config, msg_p->fs_id); |
|---|
| 188 | PINT_server_config_mgr_put_config(server_config); |
|---|
| 189 | assert(cur_fs); |
|---|
| 190 | msg_p->enc_type = cur_fs->encoding; |
|---|
| 191 | } |
|---|
| 192 | |
|---|
| 193 | if (!ENCODING_IS_VALID(msg_p->enc_type)) |
|---|
| 194 | { |
|---|
| 195 | PRINT_ENCODING_ERROR("supported", msg_p->enc_type); |
|---|
| 196 | must_loop_encodings = 1; |
|---|
| 197 | msg_p->enc_type = (ENCODING_INVALID_MIN + 1); |
|---|
| 198 | } |
|---|
| 199 | else if (!ENCODING_IS_SUPPORTED(msg_p->enc_type)) |
|---|
| 200 | { |
|---|
| 201 | PRINT_ENCODING_ERROR("supported", msg_p->enc_type); |
|---|
| 202 | must_loop_encodings = 1; |
|---|
| 203 | msg_p->enc_type = ENCODING_SUPPORTED_MIN; |
|---|
| 204 | } |
|---|
| 205 | |
|---|
| 206 | try_next_encoding: |
|---|
| 207 | assert(ENCODING_IS_VALID(msg_p->enc_type)); |
|---|
| 208 | |
|---|
| 209 | ret = PINT_encode(&msg_p->req, PINT_ENCODE_REQ, |
|---|
| 210 | &msg_p->encoded_req, msg_p->svr_addr, |
|---|
| 211 | msg_p->enc_type); |
|---|
| 212 | if (ret != 0) |
|---|
| 213 | { |
|---|
| 214 | if (must_loop_encodings) |
|---|
| 215 | { |
|---|
| 216 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, "Looping through " |
|---|
| 217 | "encodings [%d/%d]\n", msg_p->enc_type, |
|---|
| 218 | ENCODING_INVALID_MAX); |
|---|
| 219 | |
|---|
| 220 | msg_p->enc_type++; |
|---|
| 221 | if (ENCODING_IS_VALID(msg_p->enc_type)) |
|---|
| 222 | { |
|---|
| 223 | goto try_next_encoding; |
|---|
| 224 | } |
|---|
| 225 | } |
|---|
| 226 | gossip_lerr("msgpairarray_post: PINT_encode failed\n"); |
|---|
| 227 | js_p->error_code = ret; |
|---|
| 228 | return SM_ACTION_COMPLETE; |
|---|
| 229 | } |
|---|
| 230 | |
|---|
| 231 | /* calculate max response msg size and allocate space */ |
|---|
| 232 | msg_p->max_resp_sz = PINT_encode_calc_max_size( |
|---|
| 233 | PINT_ENCODE_RESP, msg_p->req.op, msg_p->enc_type); |
|---|
| 234 | |
|---|
| 235 | msg_p->encoded_resp_p = BMI_memalloc( |
|---|
| 236 | msg_p->svr_addr, msg_p->max_resp_sz, BMI_RECV); |
|---|
| 237 | |
|---|
| 238 | if (msg_p->encoded_resp_p == NULL) |
|---|
| 239 | { |
|---|
| 240 | js_p->error_code = -PVFS_ENOMEM; |
|---|
| 241 | return SM_ACTION_COMPLETE; |
|---|
| 242 | } |
|---|
| 243 | local_enc_and_alloc = 1; |
|---|
| 244 | } |
|---|
| 245 | |
|---|
| 246 | session_tag = PINT_util_get_next_tag(); |
|---|
| 247 | |
|---|
| 248 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, "%s: sm %p msgpair %d: " |
|---|
| 249 | "posting recv\n", __func__, smcb, i); |
|---|
| 250 | |
|---|
| 251 | /* post receive of response; job_id stored in recv_id */ |
|---|
| 252 | ret = job_bmi_recv(msg_p->svr_addr, |
|---|
| 253 | msg_p->encoded_resp_p, |
|---|
| 254 | msg_p->max_resp_sz, |
|---|
| 255 | session_tag, |
|---|
| 256 | BMI_PRE_ALLOC, |
|---|
| 257 | smcb, |
|---|
| 258 | i, |
|---|
| 259 | &msg_p->recv_status, |
|---|
| 260 | &msg_p->recv_id, |
|---|
| 261 | mop->params.job_context, |
|---|
| 262 | mop->params.job_timeout, |
|---|
| 263 | msg_p->req.hints); |
|---|
| 264 | if (ret == 0) |
|---|
| 265 | { |
|---|
| 266 | /* perform a quick test to see if the recv failed before posting |
|---|
| 267 | * the send; if it reports an error quickly then we can save the |
|---|
| 268 | * confusion of sending a request for which we can't recv a |
|---|
| 269 | * response |
|---|
| 270 | */ |
|---|
| 271 | ret = job_test(msg_p->recv_id, &tmp, NULL, |
|---|
| 272 | &msg_p->recv_status, 0, |
|---|
| 273 | mop->params.job_context); |
|---|
| 274 | } |
|---|
| 275 | |
|---|
| 276 | if ((ret < 0) || (ret == 1)) |
|---|
| 277 | { |
|---|
| 278 | /* it is impossible for this recv to complete at this point |
|---|
| 279 | * without errors; we haven't sent the request yet! |
|---|
| 280 | */ |
|---|
| 281 | assert(ret < 0 || msg_p->recv_status.error_code != 0); |
|---|
| 282 | if (ret < 0) |
|---|
| 283 | { |
|---|
| 284 | PVFS_perror_gossip("Post of receive failed", ret); |
|---|
| 285 | } |
|---|
| 286 | else |
|---|
| 287 | { |
|---|
| 288 | PVFS_perror_gossip("Receive immediately failed", |
|---|
| 289 | msg_p->recv_status.error_code); |
|---|
| 290 | } |
|---|
| 291 | |
|---|
| 292 | msg_p->recv_id = 0; |
|---|
| 293 | msg_p->send_id = 0; |
|---|
| 294 | |
|---|
| 295 | /* mark send as bad too and don't post it */ |
|---|
| 296 | msg_p->send_status.error_code = msg_p->recv_status.error_code; |
|---|
| 297 | msg_p->op_status = msg_p->recv_status.error_code; |
|---|
| 298 | mop->params.comp_ct -= 2; |
|---|
| 299 | |
|---|
| 300 | if (local_enc_and_alloc) |
|---|
| 301 | { |
|---|
| 302 | PINT_encode_release(&msg_p->encoded_req, PINT_ENCODE_REQ); |
|---|
| 303 | BMI_memfree(msg_p->svr_addr,msg_p->encoded_resp_p, |
|---|
| 304 | msg_p->max_resp_sz, BMI_RECV); |
|---|
| 305 | msg_p->encoded_resp_p = NULL; |
|---|
| 306 | local_enc_and_alloc = 0; |
|---|
| 307 | } |
|---|
| 308 | |
|---|
| 309 | /* continue to send other array entries if possible */ |
|---|
| 310 | continue; |
|---|
| 311 | } |
|---|
| 312 | |
|---|
| 313 | /* if we reach here, the recv has been posted without failure, but |
|---|
| 314 | * has not completed yet |
|---|
| 315 | */ |
|---|
| 316 | assert(ret == 0); |
|---|
| 317 | |
|---|
| 318 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, "%s: sm %p msgpair %d: " |
|---|
| 319 | "posting send\n", __func__, smcb, i); |
|---|
| 320 | |
|---|
| 321 | /* post send of request; job_id stored in send_id */ |
|---|
| 322 | ret = job_bmi_send_list(msg_p->encoded_req.dest, |
|---|
| 323 | msg_p->encoded_req.buffer_list, |
|---|
| 324 | msg_p->encoded_req.size_list, |
|---|
| 325 | msg_p->encoded_req.list_count, |
|---|
| 326 | msg_p->encoded_req.total_size, |
|---|
| 327 | session_tag, |
|---|
| 328 | msg_p->encoded_req.buffer_type, |
|---|
| 329 | 1, |
|---|
| 330 | smcb, |
|---|
| 331 | mop->count+i, |
|---|
| 332 | &msg_p->send_status, |
|---|
| 333 | &msg_p->send_id, |
|---|
| 334 | mop->params.job_context, |
|---|
| 335 | mop->params.job_timeout, |
|---|
| 336 | msg_p->req.hints); |
|---|
| 337 | |
|---|
| 338 | if ((ret < 0) || |
|---|
| 339 | ((ret == 1) && (msg_p->send_status.error_code != 0))) |
|---|
| 340 | { |
|---|
| 341 | if (ret < 0) |
|---|
| 342 | { |
|---|
| 343 | PVFS_perror_gossip("Post of send failed", ret); |
|---|
| 344 | } |
|---|
| 345 | else |
|---|
| 346 | { |
|---|
| 347 | PVFS_perror_gossip("Send immediately failed", |
|---|
| 348 | msg_p->send_status.error_code); |
|---|
| 349 | } |
|---|
| 350 | |
|---|
| 351 | gossip_err_unless_quiet("Send error: cancelling recv.\n"); |
|---|
| 352 | |
|---|
| 353 | job_bmi_cancel(msg_p->recv_id, mop->params.job_context); |
|---|
| 354 | |
|---|
| 355 | /* we still have to wait for recv completion, so just decrement |
|---|
| 356 | * comp_ct by one and keep going |
|---|
| 357 | */ |
|---|
| 358 | msg_p->op_status = msg_p->send_status.error_code; |
|---|
| 359 | msg_p->send_id = 0; |
|---|
| 360 | mop->params.comp_ct--; |
|---|
| 361 | } |
|---|
| 362 | else if (ret == 1) |
|---|
| 363 | { |
|---|
| 364 | /* immediate completion */ |
|---|
| 365 | msg_p->send_id = 0; |
|---|
| 366 | /* decrement our count, since send is already done. */ |
|---|
| 367 | mop->params.comp_ct--; |
|---|
| 368 | } |
|---|
| 369 | /* else: successful post, no immediate completion */ |
|---|
| 370 | } |
|---|
| 371 | |
|---|
| 372 | if (mop->params.comp_ct == 0) |
|---|
| 373 | { |
|---|
| 374 | /* everything is completed already (could happen in some failure |
|---|
| 375 | * cases); jump straight to final completion function. |
|---|
| 376 | */ |
|---|
| 377 | js_p->error_code = MSGPAIRS_COMPLETE; |
|---|
| 378 | return SM_ACTION_COMPLETE; |
|---|
| 379 | } |
|---|
| 380 | |
|---|
| 381 | /* we are still waiting on operations to complete, next state |
|---|
| 382 | * transition will handle them |
|---|
| 383 | */ |
|---|
| 384 | return SM_ACTION_DEFERRED; |
|---|
| 385 | } |
|---|
| 386 | |
|---|
| 387 | static PINT_sm_action msgpairarray_post_retry( |
|---|
| 388 | struct PINT_smcb *smcb, job_status_s *js_p) |
|---|
| 389 | { |
|---|
| 390 | PINT_sm_msgarray_op *mop = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); |
|---|
| 391 | job_id_t tmp_id; |
|---|
| 392 | |
|---|
| 393 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, "%s: sm %p, wait %d ms\n", |
|---|
| 394 | __func__, smcb, mop->params.retry_delay); |
|---|
| 395 | |
|---|
| 396 | js_p->error_code = 0; /* do not leak MSGPAIRS_RETRY through to wait */ |
|---|
| 397 | return job_req_sched_post_timer( |
|---|
| 398 | mop->params.retry_delay, |
|---|
| 399 | smcb, 0, js_p, &tmp_id, |
|---|
| 400 | mop->params.job_context); |
|---|
| 401 | } |
|---|
| 402 | |
|---|
| 403 | static PINT_sm_action msgpairarray_complete( |
|---|
| 404 | struct PINT_smcb *smcb, job_status_s *js_p) |
|---|
| 405 | { |
|---|
| 406 | PINT_sm_msgarray_op *mop = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); |
|---|
| 407 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, |
|---|
| 408 | "%s: sm %p status_user_tag %d msgarray_count %d\n", |
|---|
| 409 | __func__, smcb, (int) js_p->status_user_tag, mop->count); |
|---|
| 410 | |
|---|
| 411 | |
|---|
| 412 | /* match operation with something in the msgpair array */ |
|---|
| 413 | /* the first N tags are receives, the second N are sends */ |
|---|
| 414 | assert(js_p->status_user_tag < mop->count*2); |
|---|
| 415 | |
|---|
| 416 | if (js_p->status_user_tag < mop->count) |
|---|
| 417 | { |
|---|
| 418 | PINT_sm_msgpair_state *msg_p = |
|---|
| 419 | &mop->msgarray[js_p->status_user_tag]; |
|---|
| 420 | |
|---|
| 421 | msg_p->recv_id = 0; |
|---|
| 422 | msg_p->recv_status = *js_p; |
|---|
| 423 | |
|---|
| 424 | /* save error (if we don't already have one) in op_status */ |
|---|
| 425 | if(msg_p->op_status == 0) |
|---|
| 426 | msg_p->op_status = msg_p->recv_status.error_code; |
|---|
| 427 | |
|---|
| 428 | if(msg_p->recv_status.error_code && msg_p->send_id != 0) |
|---|
| 429 | { |
|---|
| 430 | /* we got a receive error, but send is still pending. Cancel |
|---|
| 431 | * the send |
|---|
| 432 | */ |
|---|
| 433 | job_bmi_cancel(msg_p->send_id, mop->params.job_context); |
|---|
| 434 | } |
|---|
| 435 | } |
|---|
| 436 | else |
|---|
| 437 | { |
|---|
| 438 | PINT_sm_msgpair_state *msg_p = &mop->msgarray[ |
|---|
| 439 | js_p->status_user_tag - mop->count]; |
|---|
| 440 | |
|---|
| 441 | msg_p->send_id = 0; |
|---|
| 442 | msg_p->send_status = *js_p; |
|---|
| 443 | |
|---|
| 444 | /* save error (if we don't already have one) in op_status */ |
|---|
| 445 | if(msg_p->op_status == 0) |
|---|
| 446 | msg_p->op_status = msg_p->send_status.error_code; |
|---|
| 447 | |
|---|
| 448 | if(msg_p->send_status.error_code && msg_p->recv_id != 0) |
|---|
| 449 | { |
|---|
| 450 | /* we got a send error, but recv is still pending. Cancel |
|---|
| 451 | * the recv |
|---|
| 452 | */ |
|---|
| 453 | job_bmi_cancel(msg_p->recv_id, mop->params.job_context); |
|---|
| 454 | } |
|---|
| 455 | } |
|---|
| 456 | |
|---|
| 457 | /* decrement comp_ct until all operations have completed */ |
|---|
| 458 | if (--mop->params.comp_ct > 0) |
|---|
| 459 | { |
|---|
| 460 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, |
|---|
| 461 | " msgpairarray: %d operations remain\n", |
|---|
| 462 | mop->params.comp_ct); |
|---|
| 463 | return SM_ACTION_DEFERRED; |
|---|
| 464 | } |
|---|
| 465 | |
|---|
| 466 | assert(mop->params.comp_ct == 0); |
|---|
| 467 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, |
|---|
| 468 | " msgpairarray: all operations complete\n"); |
|---|
| 469 | |
|---|
| 470 | js_p->error_code = MSGPAIRS_COMPLETE; |
|---|
| 471 | return SM_ACTION_COMPLETE; |
|---|
| 472 | } |
|---|
| 473 | |
|---|
| 474 | static PINT_sm_action msgpairarray_completion_fn( |
|---|
| 475 | struct PINT_smcb *smcb, job_status_s *js_p) |
|---|
| 476 | { |
|---|
| 477 | PINT_sm_msgarray_op *mop = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); |
|---|
| 478 | int ret = -PVFS_EINVAL, i = 0; |
|---|
| 479 | int need_retry = 0; |
|---|
| 480 | struct PINT_decoded_msg decoded_resp; |
|---|
| 481 | const char* server_string = NULL; |
|---|
| 482 | int server_type; |
|---|
| 483 | |
|---|
| 484 | /* response structure (decoded) */ |
|---|
| 485 | struct PVFS_server_resp *resp_p = NULL; |
|---|
| 486 | |
|---|
| 487 | js_p->error_code = 0; |
|---|
| 488 | |
|---|
| 489 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, "(%p) msgpairarray state: " |
|---|
| 490 | "completion_fn\n", smcb); |
|---|
| 491 | |
|---|
| 492 | for (i = 0; i < mop->count; i++) |
|---|
| 493 | { |
|---|
| 494 | PINT_sm_msgpair_state *msg_p = &mop->msgarray[i]; |
|---|
| 495 | assert(msg_p); |
|---|
| 496 | |
|---|
| 497 | /* |
|---|
| 498 | * Can take multiple trips through this function as we retry |
|---|
| 499 | * ones that failed. |
|---|
| 500 | */ |
|---|
| 501 | if (msg_p->complete) |
|---|
| 502 | continue; |
|---|
| 503 | |
|---|
| 504 | if (msg_p->op_status != 0) |
|---|
| 505 | { |
|---|
| 506 | char s[1024]; |
|---|
| 507 | PVFS_strerror_r(msg_p->op_status, s, sizeof(s)); |
|---|
| 508 | server_string = PINT_cached_config_map_addr( |
|---|
| 509 | msg_p->fs_id, msg_p->svr_addr, &server_type); |
|---|
| 510 | if(!server_string) |
|---|
| 511 | { |
|---|
| 512 | server_string = BMI_addr_rev_lookup(msg_p->svr_addr); |
|---|
| 513 | } |
|---|
| 514 | |
|---|
| 515 | gossip_err("Warning: msgpair failed to %s, will retry: %s\n", server_string, s); |
|---|
| 516 | |
|---|
| 517 | ++need_retry; |
|---|
| 518 | continue; |
|---|
| 519 | } |
|---|
| 520 | |
|---|
| 521 | ret = PINT_serv_decode_resp(msg_p->fs_id, |
|---|
| 522 | msg_p->encoded_resp_p, |
|---|
| 523 | &decoded_resp, |
|---|
| 524 | &msg_p->svr_addr, |
|---|
| 525 | msg_p->recv_status.actual_size, |
|---|
| 526 | &resp_p); |
|---|
| 527 | if (ret != 0) |
|---|
| 528 | { |
|---|
| 529 | PVFS_perror_gossip("msgpairarray decode error", ret); |
|---|
| 530 | msg_p->op_status = ret; |
|---|
| 531 | } |
|---|
| 532 | else |
|---|
| 533 | { |
|---|
| 534 | /* if we've made it this far, the server response status is |
|---|
| 535 | * meaningful, so we save it. |
|---|
| 536 | */ |
|---|
| 537 | msg_p->op_status = resp_p->status; |
|---|
| 538 | } |
|---|
| 539 | |
|---|
| 540 | /* NOTE: we call the function associated with each message, |
|---|
| 541 | * not just the one from the first array element. so |
|---|
| 542 | * there could in theory be different functions for each |
|---|
| 543 | * message (to handle different types of messages all in |
|---|
| 544 | * the same array). |
|---|
| 545 | */ |
|---|
| 546 | if (msg_p->comp_fn != NULL) |
|---|
| 547 | { |
|---|
| 548 | /* If we call the completion function, store the result on |
|---|
| 549 | * a per message pair basis. Also store some non-zero |
|---|
| 550 | * (failure) value in js_p->error_code if we see one. |
|---|
| 551 | */ |
|---|
| 552 | msg_p->op_status = msg_p->comp_fn(smcb, resp_p, i); |
|---|
| 553 | if (msg_p->op_status != 0) |
|---|
| 554 | { |
|---|
| 555 | js_p->error_code = msg_p->op_status; |
|---|
| 556 | } |
|---|
| 557 | |
|---|
| 558 | /* even if we see a failure, continue to process with the |
|---|
| 559 | * completion function. -- RobR |
|---|
| 560 | */ |
|---|
| 561 | } |
|---|
| 562 | else if (resp_p->status != 0) |
|---|
| 563 | { |
|---|
| 564 | /* no comp_fn specified and status non-zero */ |
|---|
| 565 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, |
|---|
| 566 | "notice: msgpairarray_complete: error %d " |
|---|
| 567 | "from server %d\n", resp_p->status, i); |
|---|
| 568 | |
|---|
| 569 | /* save a non-zero status to return if we see one */ |
|---|
| 570 | js_p->error_code = resp_p->status; |
|---|
| 571 | |
|---|
| 572 | /* If we don't have a completion function, there is no point |
|---|
| 573 | * in continuing to process after seeing a failure. |
|---|
| 574 | */ |
|---|
| 575 | if (js_p->error_code) |
|---|
| 576 | { |
|---|
| 577 | break; |
|---|
| 578 | } |
|---|
| 579 | } |
|---|
| 580 | |
|---|
| 581 | /* free all the resources that we used to send and receive. */ |
|---|
| 582 | ret = PINT_serv_free_msgpair_resources( |
|---|
| 583 | &msg_p->encoded_req, msg_p->encoded_resp_p, &decoded_resp, |
|---|
| 584 | &msg_p->svr_addr, msg_p->max_resp_sz); |
|---|
| 585 | if (ret) |
|---|
| 586 | { |
|---|
| 587 | PVFS_perror_gossip("Failed to free msgpair resources", ret); |
|---|
| 588 | js_p->error_code = ret; |
|---|
| 589 | return SM_ACTION_COMPLETE; |
|---|
| 590 | } |
|---|
| 591 | |
|---|
| 592 | msg_p->encoded_resp_p = NULL; |
|---|
| 593 | msg_p->max_resp_sz = 0; |
|---|
| 594 | |
|---|
| 595 | /* |
|---|
| 596 | mark that this msgpair has been completed and should not be |
|---|
| 597 | retried in the case of possible future retries |
|---|
| 598 | */ |
|---|
| 599 | msg_p->complete = 1; |
|---|
| 600 | |
|---|
| 601 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, "%s: sm %p msgpair %d " |
|---|
| 602 | "marked complete\n", __func__, smcb, i); |
|---|
| 603 | } |
|---|
| 604 | |
|---|
| 605 | if (need_retry) { |
|---|
| 606 | /* |
|---|
| 607 | * We only retry msgpairs that are not yet complete. Factor |
|---|
| 608 | * of two since they are pairs. If over the count, do not |
|---|
| 609 | * retry, just return one of the error codes. |
|---|
| 610 | */ |
|---|
| 611 | mop->params.comp_ct = 0; |
|---|
| 612 | js_p->error_code = 0; |
|---|
| 613 | for (i=0; i < mop->count; i++) { |
|---|
| 614 | |
|---|
| 615 | PINT_sm_msgpair_state *msg_p = &mop->msgarray[i]; |
|---|
| 616 | |
|---|
| 617 | if (msg_p->complete) |
|---|
| 618 | continue; |
|---|
| 619 | |
|---|
| 620 | if (msg_p->retry_flag == PVFS_MSGPAIR_RETRY |
|---|
| 621 | && PVFS_ERROR_CLASS(-msg_p->op_status) == PVFS_ERROR_BMI |
|---|
| 622 | && msg_p->retry_count < mop->params.retry_limit) { |
|---|
| 623 | |
|---|
| 624 | ++msg_p->retry_count; |
|---|
| 625 | mop->params.comp_ct += 2; |
|---|
| 626 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, |
|---|
| 627 | "*** %s: msgpair %d failed, retry %d\n", |
|---|
| 628 | __func__, i, msg_p->retry_count); |
|---|
| 629 | if(msg_p->op_status == -BMI_ECANCEL) |
|---|
| 630 | { |
|---|
| 631 | /* if the error code indicates cancel, then skip the |
|---|
| 632 | * delay. We have probably already been waiting a while |
|---|
| 633 | */ |
|---|
| 634 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, |
|---|
| 635 | "*** %s: msgpair skipping retry delay.\n", __func__); |
|---|
| 636 | js_p->error_code = MSGPAIRS_RETRY_NODELAY; |
|---|
| 637 | } |
|---|
| 638 | else |
|---|
| 639 | { |
|---|
| 640 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, |
|---|
| 641 | "*** %s: msgpair retrying after delay.\n", __func__); |
|---|
| 642 | js_p->error_code = MSGPAIRS_RETRY; |
|---|
| 643 | } |
|---|
| 644 | |
|---|
| 645 | } else { |
|---|
| 646 | char s[1024]; |
|---|
| 647 | server_string = PINT_cached_config_map_addr( |
|---|
| 648 | msg_p->fs_id, msg_p->svr_addr, &server_type); |
|---|
| 649 | if(!server_string) |
|---|
| 650 | { |
|---|
| 651 | server_string = "[UNKNOWN]"; |
|---|
| 652 | } |
|---|
| 653 | PVFS_strerror_r(msg_p->op_status, s, sizeof(s)); |
|---|
| 654 | gossip_err_unless_quiet("*** %s: msgpair to server %s failed: %s\n", |
|---|
| 655 | __func__, server_string, s); |
|---|
| 656 | if(msg_p->retry_flag != PVFS_MSGPAIR_RETRY) |
|---|
| 657 | { |
|---|
| 658 | gossip_err_unless_quiet("*** No retries requested.\n"); |
|---|
| 659 | } |
|---|
| 660 | else if(PVFS_ERROR_CLASS(-msg_p->op_status) != |
|---|
| 661 | PVFS_ERROR_BMI) |
|---|
| 662 | { |
|---|
| 663 | gossip_err_unless_quiet("*** Non-BMI failure.\n"); |
|---|
| 664 | } |
|---|
| 665 | else |
|---|
| 666 | { |
|---|
| 667 | gossip_err_unless_quiet("*** Out of retries.\n"); |
|---|
| 668 | } |
|---|
| 669 | if (js_p->error_code == 0) |
|---|
| 670 | js_p->error_code = msg_p->op_status; |
|---|
| 671 | } |
|---|
| 672 | |
|---|
| 673 | } |
|---|
| 674 | } |
|---|
| 675 | return SM_ACTION_COMPLETE; |
|---|
| 676 | } |
|---|
| 677 | |
|---|
| 678 | static PINT_sm_action msgpairarray_done( |
|---|
| 679 | struct PINT_smcb *smcb, job_status_s *js_p) |
|---|
| 680 | { |
|---|
| 681 | int task_id, error_code, remaining; |
|---|
| 682 | PINT_sm_pop_frame(smcb, &task_id, &error_code, &remaining); |
|---|
| 683 | return SM_ACTION_COMPLETE; |
|---|
| 684 | } |
|---|
| 685 | |
|---|
| 686 | /********************************************************************* |
|---|
| 687 | * helper functions used in conjunction with state machine defined above |
|---|
| 688 | */ |
|---|
| 689 | |
|---|
| 690 | int PINT_msgpairarray_init( |
|---|
| 691 | PINT_sm_msgarray_op *op, |
|---|
| 692 | int count) |
|---|
| 693 | { |
|---|
| 694 | op->msgarray = (PINT_sm_msgpair_state *)malloc( |
|---|
| 695 | count * sizeof(PINT_sm_msgpair_state)); |
|---|
| 696 | if(!op->msgarray) |
|---|
| 697 | { |
|---|
| 698 | return -PVFS_ENOMEM; |
|---|
| 699 | } |
|---|
| 700 | memset(op->msgarray, 0, (count * sizeof(PINT_sm_msgpair_state))); |
|---|
| 701 | op->count = count; |
|---|
| 702 | |
|---|
| 703 | return 0; |
|---|
| 704 | } |
|---|
| 705 | |
|---|
| 706 | /* we pass in a pointer to the array so that we can set it to NULL */ |
|---|
| 707 | void PINT_msgpairarray_destroy( |
|---|
| 708 | PINT_sm_msgarray_op *op) |
|---|
| 709 | { |
|---|
| 710 | if(op->msgarray && (&op->msgpair) != op->msgarray) |
|---|
| 711 | { |
|---|
| 712 | free(op->msgarray); |
|---|
| 713 | } |
|---|
| 714 | op->msgarray = NULL; |
|---|
| 715 | op->count = 0; |
|---|
| 716 | } |
|---|
| 717 | |
|---|
| 718 | int PINT_msgarray_status(PINT_sm_msgarray_op *op) |
|---|
| 719 | { |
|---|
| 720 | int i; |
|---|
| 721 | for (i = 0; i < op->count; i++) |
|---|
| 722 | { |
|---|
| 723 | if (op->msgarray[i].op_status != 0) |
|---|
| 724 | { |
|---|
| 725 | return op->msgarray[i].op_status; |
|---|
| 726 | } |
|---|
| 727 | } |
|---|
| 728 | return 0; |
|---|
| 729 | } |
|---|
| 730 | |
|---|
| 731 | int PINT_serv_decode_resp(PVFS_fs_id fs_id, |
|---|
| 732 | void *encoded_resp_p, |
|---|
| 733 | struct PINT_decoded_msg *decoded_resp_p, |
|---|
| 734 | PVFS_BMI_addr_t *svr_addr_p, |
|---|
| 735 | int actual_resp_sz, |
|---|
| 736 | struct PVFS_server_resp **resp_out_pp) |
|---|
| 737 | { |
|---|
| 738 | int ret = -1, server_type = 0; |
|---|
| 739 | const char *server_string; |
|---|
| 740 | |
|---|
| 741 | ret = PINT_decode(encoded_resp_p, PINT_DECODE_RESP, |
|---|
| 742 | decoded_resp_p, /* holds data on decoded resp */ |
|---|
| 743 | *svr_addr_p, actual_resp_sz); |
|---|
| 744 | if (ret > -1) |
|---|
| 745 | { |
|---|
| 746 | *resp_out_pp = (struct PVFS_server_resp *)decoded_resp_p->buffer; |
|---|
| 747 | if ((*resp_out_pp)->op == PVFS_SERV_PROTO_ERROR) |
|---|
| 748 | { |
|---|
| 749 | |
|---|
| 750 | gossip_err("Error: server does not seem to understand " |
|---|
| 751 | "the protocol that this client is using.\n"); |
|---|
| 752 | gossip_err(" Please check server logs for more " |
|---|
| 753 | "information.\n"); |
|---|
| 754 | |
|---|
| 755 | if (fs_id != PVFS_FS_ID_NULL) |
|---|
| 756 | { |
|---|
| 757 | server_string = PINT_cached_config_map_addr( |
|---|
| 758 | fs_id, *svr_addr_p, &server_type); |
|---|
| 759 | gossip_err(" Server: %s.\n", server_string); |
|---|
| 760 | } |
|---|
| 761 | else |
|---|
| 762 | { |
|---|
| 763 | gossip_err(" Server: unknown; probably an error " |
|---|
| 764 | "contacting server listed in pvfs2tab " |
|---|
| 765 | "file.\n"); |
|---|
| 766 | } |
|---|
| 767 | return(-EPROTONOSUPPORT); |
|---|
| 768 | } |
|---|
| 769 | } |
|---|
| 770 | return ret; |
|---|
| 771 | } |
|---|
| 772 | |
|---|
| 773 | int PINT_serv_free_msgpair_resources( |
|---|
| 774 | struct PINT_encoded_msg *encoded_req_p, |
|---|
| 775 | void *encoded_resp_p, |
|---|
| 776 | struct PINT_decoded_msg *decoded_resp_p, |
|---|
| 777 | PVFS_BMI_addr_t *svr_addr_p, |
|---|
| 778 | int max_resp_sz) |
|---|
| 779 | { |
|---|
| 780 | int ret = -PVFS_EINVAL; |
|---|
| 781 | |
|---|
| 782 | if (encoded_req_p && decoded_resp_p && svr_addr_p) |
|---|
| 783 | { |
|---|
| 784 | PINT_encode_release(encoded_req_p, PINT_ENCODE_REQ); |
|---|
| 785 | |
|---|
| 786 | PINT_decode_release(decoded_resp_p, PINT_DECODE_RESP); |
|---|
| 787 | |
|---|
| 788 | BMI_memfree(*svr_addr_p, encoded_resp_p, max_resp_sz, BMI_RECV); |
|---|
| 789 | |
|---|
| 790 | ret = 0; |
|---|
| 791 | } |
|---|
| 792 | return ret; |
|---|
| 793 | } |
|---|
| 794 | |
|---|
| 795 | /* PINT_serv_msgpair_array_resolve_addrs() |
|---|
| 796 | * |
|---|
| 797 | * fills in BMI address of server for each entry in the msgpair array, |
|---|
| 798 | * based on the handle and fsid |
|---|
| 799 | * |
|---|
| 800 | * returns 0 on success, -PVFS_error on failure |
|---|
| 801 | */ |
|---|
| 802 | int PINT_serv_msgpairarray_resolve_addrs( |
|---|
| 803 | PINT_sm_msgarray_op *mop) |
|---|
| 804 | { |
|---|
| 805 | int i = 0; |
|---|
| 806 | int ret = -PVFS_EINVAL; |
|---|
| 807 | |
|---|
| 808 | if ((mop->count > 0) && mop->msgarray) |
|---|
| 809 | { |
|---|
| 810 | for(i = 0; i < mop->count; i++) |
|---|
| 811 | { |
|---|
| 812 | PINT_sm_msgpair_state *msg_p = &mop->msgarray[i]; |
|---|
| 813 | assert(msg_p); |
|---|
| 814 | |
|---|
| 815 | ret = PINT_cached_config_map_to_server( |
|---|
| 816 | &msg_p->svr_addr, msg_p->handle, msg_p->fs_id); |
|---|
| 817 | |
|---|
| 818 | if (ret != 0) |
|---|
| 819 | { |
|---|
| 820 | gossip_err("Failed to map server address to handle\n"); |
|---|
| 821 | break; |
|---|
| 822 | } |
|---|
| 823 | |
|---|
| 824 | gossip_debug(GOSSIP_MSGPAIR_DEBUG, |
|---|
| 825 | " mapped handle %llu to server %lld\n", |
|---|
| 826 | llu(msg_p->handle), lld(msg_p->svr_addr)); |
|---|
| 827 | } |
|---|
| 828 | } |
|---|
| 829 | return ret; |
|---|
| 830 | } |
|---|
| 831 | |
|---|
| 832 | /* |
|---|
| 833 | * Local variables: |
|---|
| 834 | * mode: c |
|---|
| 835 | * c-indent-level: 4 |
|---|
| 836 | * c-basic-offset: 4 |
|---|
| 837 | * End: |
|---|
| 838 | * |
|---|
| 839 | * vim: ft=c ts=8 sts=4 sw=4 expandtab |
|---|
| 840 | */ |
|---|
| 841 | |
|---|