root/branches/orange-next/src/client/sysint/sys-small-io.sm @ 8935

Revision 8935, 21.5 KB (checked in by mtmoore, 2 years ago)

uuid converstation, PVFS_x_position changes, server/client side first pass done

Line 
1/*
2 * (C) 2003 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/** \file
8 *  \ingroup sysint
9 *
10 *  PVFS2 system interface routines for reading and writing small files.
11 */
12
13#include <string.h>
14#include <assert.h>
15
16#include "client-state-machine.h"
17#include "pvfs2-debug.h"
18#include "job.h"
19#include "gossip.h"
20#include "str-utils.h"
21#include "pint-cached-config.h"
22#include "PINT-reqproto-encode.h"
23#include "pint-util.h"
24#include "pvfs2-internal.h"
25
26/* The small-io state machine should only be invoked/jumped-to from the
27 * sys-io state machine.  We make this assumption and expect io parameters
28 * to be initialized already.
29 */
30
31static int small_io_completion_fn(void * user_args,
32                                  struct PVFS_server_resp * resp_p,
33                                  int index);
34
35enum {
36  MIRROR_RETRY = 132
37};
38
39%%
40
41nested machine pvfs2_client_small_io_sm
42{
43    state setup_msgpairs
44    {
45        run small_io_setup_msgpairs;
46        success => xfer_msgpairs;
47        default => return;
48    }
49
50    state xfer_msgpairs
51    {
52        jump pvfs2_msgpairarray_sm;
53        default => check_for_retries;
54    }
55
56    state check_for_retries
57    {
58        run small_io_check_for_retries;
59        MIRROR_RETRY => xfer_msgpairs;
60        default => cleanup;  /*no mirroring, done, or out of retries*/
61    }
62
63    state cleanup
64    {
65        run small_io_cleanup;
66        default => return;
67    }
68}
69
70%%
71
72/**
73 * Small I/O is done in cases where the size of data transferred between
74 * client and server is smaller than the maximum unexpected message size
75 * accepted by the BMI transport interface in use.  In this case, we don't
76 * need to perform initial rendezvous setup messages before sending the
77 * actual data (the 'flow'), instead we just pack the data in the unexpected
78 * message.  The sys-io state machine checks for possible 'small i/o' cases
79 * and routes to the small i/o state actions in case.
80 */
81static PINT_sm_action small_io_setup_msgpairs(
82    struct PINT_smcb *smcb, job_status_s *js_p)
83{
84    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
85    PVFS_object_attr *attr = NULL;
86    int i = 0;
87    int ret;
88    PVFS_handle datafile_handle;
89    int regions = 0;
90    PINT_sm_msgpair_state *msg_p;
91    uint32_t server_nr;
92
93    js_p->error_code = 0;
94
95    attr = &sm_p->getattr.attr;
96    assert(attr);
97
98    /* initialize msgarray. one msgpair for each handle with data. */
99    ret = PINT_msgpairarray_init(&sm_p->msgarray_op, sm_p->u.io.datafile_count);
100    if(ret < 0)
101    {
102        js_p->error_code = ret;
103        return SM_ACTION_COMPLETE;
104    }
105
106    /*initialize small_io_ctx array.  one context for each handle in the file.*/
107    sm_p->u.io.small_io_ctx = malloc(attr->u.meta.dfile_count *
108                                     sizeof(*sm_p->u.io.small_io_ctx));
109    if (!sm_p->u.io.small_io_ctx)
110    {
111        PINT_msgpairarray_destroy(&sm_p->msgarray_op);
112        js_p->error_code = -PVFS_ENOMEM;
113        return SM_ACTION_COMPLETE;
114    }
115    memset(sm_p->u.io.small_io_ctx,0,sizeof(*sm_p->u.io.small_io_ctx) *
116                                     attr->u.meta.dfile_count);
117
118
119    foreach_msgpair(&sm_p->msgarray_op, msg_p, i)
120    {
121        PVFS_handle_copy(datafile_handle,
122            attr->u.meta.dfile_array[sm_p->u.io.datafile_index_array[i]]);
123
124        gossip_debug(GOSSIP_IO_DEBUG, "   small_io_setup_msgpairs: "
125                     "handle: %llu\n", llu(datafile_handle));
126
127        if(sm_p->u.io.io_type == PVFS_IO_WRITE)
128        {
129            PINT_Request_state * file_req_state = NULL;
130            PINT_Request_state * mem_req_state = NULL;
131            PINT_request_file_data file_data;
132            PINT_Request_result result;
133
134            memset(&file_data, 0, sizeof(PINT_request_file_data));
135            file_data.server_ct = attr->u.meta.dfile_count;
136            file_data.fsize = 0;
137            file_data.dist = attr->u.meta.dist;
138            file_data.extend_flag = 1;
139            result.segmax = IO_MAX_REGIONS;
140
141            result.bytemax = PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req);
142            file_req_state = PINT_new_request_state(sm_p->u.io.file_req);
143            mem_req_state = PINT_new_request_state(sm_p->u.io.mem_req);
144
145            PINT_REQUEST_STATE_SET_TARGET(file_req_state,
146                                          sm_p->u.io.file_req_offset);
147
148            PINT_REQUEST_STATE_SET_FINAL(file_req_state,
149                                         sm_p->u.io.file_req_offset +
150                                         result.bytemax);
151
152            file_data.server_nr = sm_p->u.io.datafile_index_array[i];
153
154            result.segs = 0;
155            result.bytes = 0;
156            result.offset_array = msg_p->req.u.small_io.offsets;
157            result.size_array = msg_p->req.u.small_io.sizes;
158            msg_p->req.u.small_io.buffer = sm_p->u.io.buffer;
159
160            ret = PINT_process_request(
161                file_req_state, mem_req_state,
162                &file_data,
163                &result,
164                PINT_CLIENT);
165            if(ret < 0)
166            {
167                js_p->error_code = ret;
168                PINT_free_request_state(file_req_state);
169                PINT_free_request_state(mem_req_state);
170                return SM_ACTION_COMPLETE;
171            }
172
173            regions = result.segs;
174
175            PINT_free_request_state(file_req_state);
176            PINT_free_request_state(mem_req_state);
177        }
178
179        /* if this is a write operation, the appropriate offset and size
180         * arrays will have been filled in by the request processing above.
181         * reads don't require processing of the memory request until
182         * the response. 
183         */
184        PINT_SERVREQ_SMALL_IO_FILL(
185            msg_p->req,
186            *sm_p->cred_p,
187            sm_p->object_ref.fs_id,
188            datafile_handle,
189            sm_p->u.io.io_type,
190            sm_p->u.io.datafile_index_array[i],
191            attr->u.meta.dfile_count,
192            attr->u.meta.dist,
193            sm_p->u.io.file_req,
194            sm_p->u.io.file_req_offset,
195            regions,
196            PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
197            sm_p->hints);
198
199        msg_p->fs_id  = sm_p->object_ref.fs_id;
200        PVFS_handle_copy(msg_p->handle, datafile_handle);
201
202        /*if we are processing a read request and the source file has mirrored
203         *handles, then bypass msgpairarray's retry mechanism.  SMALL-IO will
204         *prepare another set of msgpairs using the mirrors and then retry.
205        */
206        if (sm_p->u.io.io_type == PVFS_IO_READ &&
207            attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
208        {
209            msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY; 
210        }
211        else
212        {
213            msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
214        }
215        msg_p->comp_fn = small_io_completion_fn;
216
217        ret = PINT_cached_config_map_to_server(
218            &msg_p->svr_addr, datafile_handle,
219            sm_p->object_ref.fs_id);
220        if(ret < 0)
221        {
222            gossip_lerr("Failed to map data server address\n");
223            js_p->error_code = ret;
224            return SM_ACTION_COMPLETE;
225        }
226
227        /*store the original datahandle for later use.*/
228        server_nr = msg_p->req.u.small_io.server_nr;
229        PVFS_handle_copy(sm_p->u.io.small_io_ctx[server_nr].original_datahandle,
230                         msg_p->handle);
231    }
232
233    js_p->error_code = 0;
234
235    PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
236    return SM_ACTION_COMPLETE;
237}
238
239/**
240 * We assume that the response buffer hasn't been freed yet (before
241 * the completion function is called.   The msgpairarray.sm doesn't
242 * free the response buffer until after the completion function is
243 * called.
244 */
245static int small_io_completion_fn(void * user_args,
246                                  struct PVFS_server_resp * resp_p,
247                                  int index)
248{
249    struct PINT_smcb *smcb = (struct PINT_smcb *)user_args;
250    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
251    PINT_sm_msgarray_op   *mop   = &(sm_p->msgarray_op);
252    PINT_sm_msgpair_state *msg_p = &(mop->msgarray[index]);
253    PINT_client_small_io_ctx *ctx =
254         &(sm_p->u.io.small_io_ctx[msg_p->req.u.small_io.server_nr]);
255    uint32_t server_nr = msg_p->req.u.small_io.server_nr;
256    PVFS_object_attr *attr __attribute__((unused)) = &(sm_p->getattr.attr);
257    PVFS_metafile_attr *meta __attribute__((unused)) = &(attr->u.meta);
258
259    int ret = 0;
260
261    assert(resp_p->op == PVFS_SERV_SMALL_IO);
262
263    if(resp_p->status != 0)
264    {
265        return resp_p->status;
266    }
267
268    if(resp_p->u.small_io.io_type == PVFS_IO_READ)
269    {
270        PVFS_size sizes[IO_MAX_REGIONS];
271        PVFS_offset offsets[IO_MAX_REGIONS];
272        PVFS_object_attr * attr = &sm_p->getattr.attr;
273        PINT_request_file_data fdata;
274        PINT_Request_result result;
275        PINT_Request_state * file_req_state;
276        PINT_Request_state * mem_req_state;
277        int i = 0;
278        int done = 0;
279        PVFS_size bytes_processed = 0;
280
281        memset(&fdata, 0, sizeof(fdata));
282
283        if(resp_p->u.small_io.result_size != 0)
284        {
285            memset(&fdata, 0, sizeof(PINT_request_file_data));
286            fdata.server_ct = attr->u.meta.dfile_count;
287
288            fdata.server_nr = server_nr;
289            fdata.dist = attr->u.meta.dist;
290            fdata.fsize = resp_p->u.small_io.bstream_size;
291
292            result.segmax = IO_MAX_REGIONS;
293            result.bytemax = resp_p->u.small_io.result_size;
294            result.size_array = sizes;
295            result.offset_array = offsets;
296
297            file_req_state = PINT_new_request_state(sm_p->u.io.file_req);
298            mem_req_state = PINT_new_request_state(sm_p->u.io.mem_req);
299
300            PINT_REQUEST_STATE_SET_TARGET(file_req_state,
301                                          sm_p->u.io.file_req_offset);
302            PINT_REQUEST_STATE_SET_FINAL(
303                file_req_state, sm_p->u.io.file_req_offset +
304                PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req));
305
306            do
307            {
308                result.segs = 0;
309                result.bytes = 0;
310
311                ret = PINT_process_request(
312                    file_req_state,
313                    mem_req_state,
314                    &fdata,
315                    &result,
316                    PINT_CLIENT);
317                if(ret < 0)
318                {
319                    gossip_err("Failed processing request in small I/O read\n");
320                    return ret;
321                }
322
323                for(i = 0; i < result.segs && !done; ++i)
324                {
325                    int tmp_size;
326                    char * src_ptr;
327                    char * dest_ptr;
328
329                    dest_ptr = (char *)sm_p->u.io.buffer + offsets[i];
330                    src_ptr = (char *)resp_p->u.small_io.buffer +
331                        bytes_processed;
332
333                    if((bytes_processed + sizes[i]) <=
334                       resp_p->u.small_io.result_size)
335                    {
336                        tmp_size = sizes[i];
337                    }
338                    else
339                    {
340                        tmp_size = resp_p->u.small_io.result_size -
341                            bytes_processed;
342                        done = 1;
343                    }
344
345                    memcpy(dest_ptr, src_ptr, sizes[i]);
346                    bytes_processed += tmp_size;
347                }
348            } while(!PINT_REQUEST_DONE(file_req_state) && !done);
349
350            if(resp_p->u.small_io.result_size != bytes_processed)
351            {
352                gossip_err("size of bytes copied to user buffer "
353                           "(%llu) do not match size of response (%llu)\n",
354                           llu(bytes_processed),
355                           llu(resp_p->u.small_io.result_size));
356                return -PVFS_EINVAL;
357            }
358            PINT_free_request_state(file_req_state);
359            PINT_free_request_state(mem_req_state);
360        }
361    } /*if PVFS_IO_READ*/
362
363    sm_p->u.io.dfile_size_array[server_nr] = resp_p->u.small_io.bstream_size;
364    //sm_p->u.io.dfile_size_array[index] = resp_p->u.small_io.bstream_size;
365
366    sm_p->u.io.total_size += resp_p->u.small_io.result_size;
367
368    /* Let's SMALL-IO know that the msg completed. */
369    ctx->msg_completed = 1;
370
371    /* To test fail-over with small-io, uncomment the following code.  This
372     * will force each primary handle to have each of its mirrored handles
373     * tried on a READ io.  If you are testing a file with many primary
374     * handles and/or many copies and don't want to wade through such a large
375     * test, then don't set the retry-limit.  By default, it is normally set
376     * at 5.  You can tweak this value in the pvfs2-fs.conf file, if you
377     * want.
378    */
379
380    gossip_debug(GOSSIP_IO_DEBUG,"handle=%llu \toperation=%d \toffset=%ld "
381                             "\taggregate_size=%ld\n",
382                            llu(msg_p->req.u.small_io.handle),
383                            msg_p->req.u.small_io.io_type,
384                            ((long int)msg_p->req.u.small_io.file_req_offset),
385                            ((long int)msg_p->req.u.small_io.aggregate_size));
386
387/*
388    if (   (sm_p->u.io.io_type == PVFS_IO_READ)
389        && (attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
390                               == PVFS_ATTR_META_MIRROR_DFILES
391        && (sm_p->u.io.retry_count < mop->params.retry_limit))
392    {
393       mop->params.retry_limit = meta->mirror_copies_count;
394       ctx->msg_completed = 0;
395    }
396*/
397
398    return 0;
399}
400
401
402static int small_io_check_for_retries( struct PINT_smcb *smcb
403                                     , job_status_s *js_p)
404{
405    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s..\n",__func__);
406
407    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
408    struct PVFS_object_attr *attr = &(sm_p->getattr.attr);
409    PVFS_metafile_attr *meta = &(attr->u.meta);
410    PINT_client_small_io_ctx *ctx = NULL;
411    PINT_sm_msgarray_op *mop = &sm_p->msgarray_op;
412    PINT_sm_msgpair_state *msgarray = mop->msgarray;
413    PINT_sm_msgpair_state *msg = NULL;
414    PINT_sm_msgpair_state *new_msg = NULL;
415    PINT_sm_msgarray_op new_mop = {{0},0,0,{0}};
416    char *enc_req_bytes = NULL;
417
418    uint32_t retry_msg_count = 0;
419    uint32_t index = 0;
420    uint32_t copies = 0;
421    uint32_t server_nr = 0;
422    int i   = 0;
423    int j   = 0;
424    int k   = 0;
425    int ret = 0;
426
427    /*if we are processing a write request, then msgpairarray handles retries.
428     *if we are processing a read and the source file is mirrored, then
429     *SMALL-IO handles the retries.
430    */
431    if (sm_p->u.io.io_type == PVFS_IO_WRITE ||
432        ((attr->mask & PVFS_ATTR_META_MIRROR_DFILES) !=
433          PVFS_ATTR_META_MIRROR_DFILES))
434    {
435       return SM_ACTION_COMPLETE;
436    }
437
438    /* Do any messages need to be retried? */
439    for (i=0; i<mop->count; i++)
440    {
441        server_nr = msgarray[i].req.u.small_io.server_nr;
442        ctx = &sm_p->u.io.small_io_ctx[server_nr];
443        if (!ctx->msg_completed)
444            retry_msg_count++;
445    }
446
447    /* no retries needed */
448    if (!retry_msg_count)
449    {
450        return SM_ACTION_COMPLETE;
451    }
452
453    /* do we have any retries available? */
454    if (sm_p->u.io.retry_count >= mop->params.retry_limit)
455    {
456        return SM_ACTION_COMPLETE;
457    }
458
459    /* okay. let's setup new msgpairs to retry. we will modify the incomplete
460     * msg pairs stored in msgarray and then copy them into a new msgarray
461     * before calling msgpairarray.sm.
462    */
463    for (i=0; i<mop->count; i++)
464    {
465        msg = &msgarray[i];
466        server_nr = msg->req.u.small_io.server_nr;
467        ctx = &sm_p->u.io.small_io_ctx[server_nr];
468     
469        /* don't process completed messages */
470        if (ctx->msg_completed)
471           continue;
472
473        /* for incomplete messages, cleanup memory, if necessary */
474        enc_req_bytes = (char *)&(msg->encoded_req);
475        for (k=0; k<sizeof(msg->encoded_req); k++)
476        {
477           if (enc_req_bytes[k] != '\0')
478           {
479              PINT_encode_release(&(msg->encoded_req),PINT_ENCODE_REQ);
480              break;
481           }
482        }/*end for*/
483
484        if (msg->encoded_resp_p)
485        {
486            BMI_memfree(msg->svr_addr
487                       ,msg->encoded_resp_p
488                       ,msg->max_resp_sz
489                       ,BMI_RECV);
490        }
491       
492        /* Should we use the original datahandle? */
493        if (ctx->retry_original)
494        {
495            ctx->retry_original = 0;
496            PVFS_handle_copy(msg->handle, ctx->original_datahandle);
497            PVFS_handle_copy(msg->req.u.small_io.handle,
498                             ctx->original_datahandle);
499            msg->svr_addr = 0;
500            ret = PINT_cached_config_map_to_server(&msg->svr_addr
501                                                  ,msg->handle
502                                                  ,msg->fs_id);
503            if (ret)
504            {
505               gossip_lerr("Unable to determine the server address "
506                           "for this handle (%llu)"
507                           ,llu(msg->handle));
508               js_p->error_code = ret;
509               return SM_ACTION_COMPLETE;
510            }
511            continue;
512        }/*end retry_original*/
513
514        /* get next mirrored handle.  note:  if a mirrored handle is zero, then
515         * this means that the creation of this mirrored object failed for its
516         * particular server.  if so, then get the next valid handle.  as a
517         * last resort, retry the original handle.
518        */
519        copies = ctx->current_copies_count;
520        for (;copies < meta->mirror_copies_count; copies++)
521        {
522            index = (copies*meta->dfile_count) + server_nr;
523            if (meta->mirror_dfile_array[index] != 0)
524            {  /* we have found a valid mirrored handle */
525               PVFS_handle_copy(msg->handle, meta->mirror_dfile_array[index]);
526               break;
527            }
528        }
529
530        /* if we haven't found a valid mirrored handle, retry the original
531         * datahandle.
532        */
533        if ( copies == meta->mirror_copies_count )
534        {
535           PVFS_handle_copy(msg->handle, ctx->original_datahandle);
536           ctx->retry_original = 0;
537           ctx->current_copies_count = 0;
538           sm_p->u.io.retry_count++;
539           PVFS_handle_copy(msg->req.u.small_io.handle,
540                            ctx->original_datahandle);
541           msg->svr_addr = 0;
542           ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
543                                               ,msg->handle
544                                               ,msg->fs_id);
545           if (ret)
546           {
547              gossip_lerr("Unable to determine the server address "
548                          "for this handle (%llu)"
549                          ,llu(msg->handle));
550              js_p->error_code = ret;
551              return SM_ACTION_COMPLETE;
552           }
553           continue;
554        }/*end if we have to use the original*/
555
556        /* Otherwise, use the discovered mirrored handle */
557        PVFS_handle_copy(msg->req.u.small_io.handle, msg->handle);
558        msg->svr_addr = 0;
559        ret = PINT_cached_config_map_to_server(&(msg->svr_addr),
560                                               msg->handle,
561                                               msg->fs_id);
562        if (ret)
563        {
564           gossip_lerr("Unable to determine the server address "
565                       "for this handle (%llu)"
566                       ,llu(msg->handle));
567           js_p->error_code = ret;
568           return SM_ACTION_COMPLETE;
569        }
570
571        /* and setup for the next retry event */
572        ctx->current_copies_count++;
573        if (ctx->current_copies_count == meta->mirror_copies_count)
574        {
575           ctx->current_copies_count = 0;
576           ctx->retry_original = 1;
577           sm_p->u.io.retry_count++;
578        }
579    }/*end for each msgpair*/
580
581    /* Now, create a new msgpair array and populate from the above modified
582     * messages.
583    */
584    ret = PINT_msgpairarray_init(&new_mop,retry_msg_count);
585    if (ret)
586    {
587        gossip_lerr("Unable to initialize msgarray_op:new_op\n");
588        js_p->error_code = ret;
589        return SM_ACTION_COMPLETE;
590    }
591
592    /* populate the new msgarray with the modified messages */
593    for (i=0, j=0; i<mop->count && j<new_mop.count; i++)
594    {
595        msg = &msgarray[i];
596        server_nr = msg->req.u.small_io.server_nr;
597        ctx = &sm_p->u.io.small_io_ctx[server_nr];
598     
599        /* don't populate with completed messages */
600        if (ctx->msg_completed)
601           continue;
602
603        new_msg = &new_mop.msgarray[j];
604        j++;
605
606        new_msg->fs_id       = msg->fs_id;
607        PVFS_handle_copy(new_msg->handle, msg->handle);
608        new_msg->comp_fn     = msg->comp_fn;
609        new_msg->svr_addr    = msg->svr_addr;
610        new_msg->req         = msg->req;
611        new_msg->enc_type    = msg->enc_type;
612        new_msg->retry_flag  = msg->retry_flag;
613    }/*end for*/
614
615    /* Destroy the old msgarray and substitute with the new. Params are left
616     * in tact.
617    */
618    PINT_msgpairarray_destroy(mop);
619    mop->count    = new_mop.count;
620    mop->msgarray = new_mop.msgarray;
621    mop->msgpair  = new_mop.msgpair;
622
623    /* Push the msgarray_op and jump to msgpairarray.sm */
624    PINT_sm_push_frame(smcb,0,mop);
625    js_p->error_code=MIRROR_RETRY;
626    return SM_ACTION_COMPLETE;
627}/*end small_io_check_for_retries*/
628
629
630
631static int small_io_cleanup(
632    struct PINT_smcb *smcb, job_status_s *js_p)
633{
634    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
635
636    PINT_msgpairarray_destroy(&sm_p->msgarray_op);
637
638    /*release the ctx array; this array is allocated whether or not the
639     *file to read is mirrored.
640    */
641    free(sm_p->u.io.small_io_ctx);
642
643    return SM_ACTION_COMPLETE;
644}
645
646/*
647 * Local variables:
648 *  mode: c
649 *  c-indent-level: 4
650 *  c-basic-offset: 4
651 * End:
652 *
653 * vim: ft=c ts=8 sts=4 sw=4 expandtab
654 */
Note: See TracBrowser for help on using the browser.