root/branches/windows-client/src/client/sysint/sys-getattr.sm @ 8633

Revision 8633, 46.9 KB (checked in by sampson, 3 years ago)

Initial client testing

Line 
1/*
2 * (C) 2003 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/** \file
8 *  \ingroup sysint
9 *
10 *  PVFS2 system interface routines for obtaining attributes of an object
11 *  (file or directory).
12 */
13#include <string.h>
14#include <assert.h>
15
16#include "client-state-machine.h"
17#include "pvfs2-debug.h"
18#include "job.h"
19#include "gossip.h"
20#include "str-utils.h"
21#include "pint-util.h"
22#include "pvfs2-util.h"
23#include "pint-cached-config.h"
24#include "PINT-reqproto-encode.h"
25#include "pvfs2-internal.h"
26#include "pvfs2-types-debug.h"
27
28/* pvfs2_client_getattr_sm
29 *
30 * The sm_p->msgpair structure is used to get the attributes of the
31 * object itself.  We convert the original attribute mask (in
32 * sm_p->u.getattr.attrmask) to ask for datafile and distribution info
33 * if the user asked for file size (PVFS_ATTR_SYS_SIZE).  This allows
34 * us to obtain this information (if the object turns out to be a
35 * metafile) so that we can later look up the datafile sizes and
36 * calculate the overall file size.
37 *
38 * The sm_p->msgpairarray is used to get datafile sizes, if it turns
39 * out that we need them.  This space will also need to be freed, if
40 * we grab these sizes.
41 */
42
43#ifdef WIN32
44static struct profiler getattr_prof;
45#else
46static struct profiler getattr_prof __attribute__((unused));
47#endif
48
49extern job_context_id pint_client_sm_context;
50
51enum
52{
53    GETATTR_ACACHE_MISS = 1,
54    GETATTR_NEED_DATAFILE_SIZES = 2,
55    GETATTR_IO_RETRY = 3
56};
57
58/* completion function prototypes */
59static int getattr_object_getattr_comp_fn(
60    void *v_p, struct PVFS_server_resp *resp_p, int index);
61static int getattr_datafile_getattr_comp_fn(
62    void *v_p, struct PVFS_server_resp *resp_p, int index);
63
64%%
65
66nested machine pvfs2_client_datafile_getattr_sizes_sm
67{
68    state datafile_getattr_setup_msgpairarray
69    {
70        run getattr_datafile_getattr_setup_msgpairarray;
71        success => datafile_getattr_xfer_msgpairarray;
72        default => datafile_getattr_cleanup;
73    }
74   
75    state datafile_getattr_xfer_msgpairarray
76    {
77        jump pvfs2_msgpairarray_sm;
78        default => datafile_getattr_retry;
79    }
80
81    state datafile_getattr_retry
82    {
83        run getattr_datafile_getattr_retry;
84        GETATTR_IO_RETRY => datafile_getattr_xfer_msgpairarray;
85        default => datafile_getattr_cleanup;
86    }
87
88    state datafile_getattr_cleanup
89    {
90        run getattr_datafile_getattr_cleanup;
91        default => return;
92    }
93}
94
95
96nested machine pvfs2_client_getattr_sm
97{
98    state acache_lookup
99    {
100        run getattr_acache_lookup;
101        GETATTR_ACACHE_MISS => object_getattr_setup_msgpair;
102        GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
103        default => cleanup;
104    }
105
106    state object_getattr_setup_msgpair
107    {
108        run getattr_object_getattr_setup_msgpair;
109        success => object_getattr_xfer_msgpair;
110        default => cleanup;
111    }
112
113    state object_getattr_xfer_msgpair
114    {
115        jump pvfs2_msgpairarray_sm;
116        success => acache_insert;
117        GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
118        default => object_getattr_failure;
119    }
120
121    state acache_insert
122    {
123        run getattr_acache_insert;
124        default => cleanup;
125    }
126
127    state object_getattr_failure
128    {
129        run getattr_object_getattr_failure;
130        default => cleanup;
131    }
132
133    state datafile_get_sizes
134    {
135        jump pvfs2_client_datafile_getattr_sizes_sm;
136        success => acache_insert;
137        default => cleanup;
138    }
139
140    state cleanup
141    {
142        run getattr_cleanup;
143        default => return;
144    }
145}
146
147machine pvfs2_client_sysint_getattr_sm
148{
149    state dowork
150    {
151        jump pvfs2_client_getattr_sm;
152        default => set_sys_response;
153    }
154
155    state set_sys_response
156    {
157        run getattr_set_sys_response;
158        default => terminate;
159    }
160}
161   
162%%
163
164
165/** Initiate retrieval of object attributes.
166 */
167PVFS_error PVFS_isys_getattr(
168    PVFS_object_ref ref,
169    uint32_t attrmask,
170    const PVFS_credentials *credentials,
171    PVFS_sysresp_getattr *resp_p,
172    PVFS_sys_op_id *op_id,
173    PVFS_hint hints,
174    void *user_ptr)
175{
176    PVFS_error ret = -PVFS_EINVAL;
177    PINT_smcb *smcb = NULL;
178    PINT_client_sm *sm_p = NULL;
179
180    gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_getattr entered\n");
181
182    if ((ref.handle == PVFS_HANDLE_NULL) ||
183        (ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
184    {
185        gossip_err("invalid (NULL) required argument\n");
186        return ret;
187    }
188   
189    if (attrmask & ~(PVFS_ATTR_SYS_ALL))
190    {
191        gossip_err("invalid attrmask\n");
192        return ret;
193    }
194
195    PINT_smcb_alloc(&smcb, PVFS_SYS_GETATTR,
196            sizeof(struct PINT_client_sm),
197            client_op_state_get_machine,
198            client_state_machine_terminate,
199            pint_client_sm_context);
200    if (smcb == NULL)
201    {
202        return -PVFS_ENOMEM;
203    }
204    sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
205
206    PINT_init_msgarray_params(sm_p, ref.fs_id);
207    PINT_init_sysint_credentials(sm_p->cred_p, credentials);
208    sm_p->error_code = 0;
209    sm_p->object_ref = ref;
210    sm_p->u.getattr.getattr_resp_p = resp_p;
211    PVFS_hint_copy(hints, &sm_p->hints);
212    PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle)
213                 ,&ref.handle);
214   
215    PINT_SM_GETATTR_STATE_FILL(
216        sm_p->getattr,
217        ref,
218        PVFS_util_sys_to_object_attr_mask(
219            attrmask),
220        PVFS_TYPE_NONE,
221        0);
222
223    return PINT_client_state_machine_post(
224        smcb,  op_id, user_ptr);
225}
226
227/** Retrieve object attributes.
228 */
229PVFS_error PVFS_sys_getattr(
230    PVFS_object_ref ref,
231    uint32_t attrmask,
232    const PVFS_credentials *credentials,
233    PVFS_sysresp_getattr *resp_p,
234    PVFS_hint hints)
235{
236    PVFS_error ret, error;
237    PVFS_sys_op_id op_id;
238
239    gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_getattr entered\n");
240
241    ret = PVFS_isys_getattr(ref, attrmask, credentials,
242                            resp_p, &op_id, hints, NULL);
243    if (ret)
244    {
245        PVFS_perror_gossip("PVFS_isys_getattr call", ret);
246        return ret;
247    }
248
249    if(op_id != -1)
250    {
251        /* did not complete immediately, so we wait */
252
253        ret = PVFS_sys_wait(op_id, "getattr", &error);
254        if (ret)
255        {
256            PVFS_perror_gossip("PVFS_sys_wait call", ret);
257        }
258        if(error)
259        {
260            ret = error;
261        }
262        PINT_sys_release(op_id);
263    }
264
265    return ret;
266}
267
268
269/**
270 * getattr_acache_lookup
271 * @ingroup client_sm_getattr
272 *
273 * This function is invoked as the first state action of the
274 * getattr-dowork state machine.  It performs a lookup into the
275 * attribute cache for the attribute in question, and returns
276 * result codes for a hit or a miss. 
277 *
278 * @param smcb This must be a valid client state machine handle, with
279 * the @ref getattr field containing valid values for the
280 * fsid/handle of the desired attribute (in object_ref), as well as the
281 * requested attribute mask (req_attrmask).
282 *
283 * @param js_p Contains the return code to be set by this function in the
284 * @ref error_code field.  This determines the next state action to jump
285 * to in the getattr-dowork state machine.  Possible values for js_p->error_code
286 * are:
287 * <ul>
288 * <li><b>GETATTR_ACACHE_MISS</b> - The requested attribute was not found
289 * in the attribute cache, or the attribute was found, but the attribute's
290 * mask did not include values required by the requested mask (req_attrmask).
291 * </li>
292 * <li><b>GETATTR_NEED_DATAFILE_SIZES</b> - The requested attribute was found
293 * in the attribute cache and the mask was sufficient, but the data file size
294 * was requested for this handle, so we need to get that next.
295 * </li>
296 * <li><b>default</b> - The requested attribute was found in the attribute
297 * cache and its mask values were sufficient.
298 * </li>
299 *
300 * @return This function should always return 1 unless an error occurred
301 * within the internals of the state machine.
302 */
303static PINT_sm_action getattr_acache_lookup(
304        struct PINT_smcb *smcb, job_status_s *js_p)
305{
306    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
307    uint32_t trimmed_mask = 0;
308    int missing_attrs;
309    PVFS_object_ref object_ref;
310    int ret = -1;
311    int attr_status = -1;
312    int size_status = -1;
313
314    js_p->error_code = 0;
315
316    object_ref = sm_p->getattr.object_ref;
317
318    assert(object_ref.handle != PVFS_HANDLE_NULL);
319    assert(object_ref.fs_id != PVFS_FS_ID_NULL);
320
321    gossip_debug(GOSSIP_ACACHE_DEBUG, "%s: handle %llu fsid %d\n",
322      __func__, llu(object_ref.handle), object_ref.fs_id);
323
324   
325    /* The sys attr mask request is converted to object
326     * attr mask values for comparison with the cached
327     */
328    if(sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
329    {
330        sm_p->getattr.req_attrmask |= PVFS_ATTR_META_ALL;
331    }
332
333    if(sm_p->getattr.flags & PINT_SM_GETATTR_BYPASS_CACHE)
334    {
335        gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: forced acache miss: "
336                    " [%llu]\n",
337                      llu(object_ref.handle));
338        js_p->error_code = GETATTR_ACACHE_MISS;
339        return SM_ACTION_COMPLETE;
340    }
341
342    ret = PINT_acache_get_cached_entry(object_ref,
343        &sm_p->getattr.attr,
344        &attr_status,
345        &sm_p->getattr.size,
346        &size_status);
347    if(ret < 0 || attr_status < 0)
348    {
349        gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: clean acache miss: "
350                    " [%llu]\n",
351                      llu(object_ref.handle));
352 
353        js_p->error_code = GETATTR_ACACHE_MISS;
354        return SM_ACTION_COMPLETE;
355    }
356
357    /* acache hit, check results */
358 
359    /* The sys attr mask request is converted to object
360     * attr mask values for comparison with the cached
361     * entry
362     */
363    trimmed_mask = sm_p->getattr.req_attrmask;
364
365    gossip_debug(GOSSIP_GETATTR_DEBUG,"request attrmask:\n");
366    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,sm_p->getattr.req_attrmask);
367
368    gossip_debug(GOSSIP_GETATTR_DEBUG,"trimmed attrmask:\n");
369    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,trimmed_mask);
370
371    gossip_debug(GOSSIP_GETATTR_DEBUG,"returned attrmask:\n");
372    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,sm_p->getattr.attr.mask);
373
374    /* the trimmed mask is used for making sure that we're only
375     * checking attr bits that make sense for the object type
376     * since the caller may have requested all attributes in
377     * the case where it doesn't know what type of object we're
378     * doing the getattr against.
379     */
380    if (sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE)
381    {
382        trimmed_mask &= (PVFS_ATTR_META_ALL |
383                         PVFS_ATTR_META_UNSTUFFED |
384                         PVFS_ATTR_DATA_SIZE |
385                         PVFS_ATTR_COMMON_ALL);
386    }
387    else if (sm_p->getattr.attr.objtype == PVFS_TYPE_SYMLINK)
388    {
389        trimmed_mask &= (PVFS_ATTR_SYMLNK_ALL | PVFS_ATTR_COMMON_ALL);
390    }
391    else if (sm_p->getattr.attr.objtype == PVFS_TYPE_DIRECTORY)
392    {
393        trimmed_mask &= (PVFS_ATTR_COMMON_ALL | PVFS_ATTR_DIR_ALL);
394    }
395 
396    /* trimmed_mask contains the list of attributes
397     * requested for a particular object,
398     * while sm_p->getattr.attr.mask contains
399     * the list of attributes cached for that object.
400     * The cached attributes can be used if requested
401     * is less than cached, i.e. all the attributes
402     * we need are already cached.  So we need to do
403     * a bitwise comparison of requested <= cached.
404     *
405     * xor of the two masks gives us the bits that are different,
406     * and-ing that result with the requested mask gives us the
407     * bits in the requested mask but not in the cached mask.
408     */
409    missing_attrs = ((trimmed_mask ^ sm_p->getattr.attr.mask) &
410                     trimmed_mask);
411
412    gossip_debug(GOSSIP_GETATTR_DEBUG,"missing attrmask BEFORE:\n");
413    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,missing_attrs);
414
415    if ( missing_attrs & PVFS_ATTR_META_MIRROR_DFILES )
416    {
417       /*Mirroring is optional, so remove mirror-dfiles*/
418         missing_attrs &= ~PVFS_ATTR_META_MIRROR_DFILES;   
419    }
420         
421    gossip_debug(GOSSIP_GETATTR_DEBUG,"missing attrmask AFTER:\n");
422    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,missing_attrs);
423
424    if((missing_attrs == PVFS_ATTR_DATA_SIZE && size_status == 0) ||
425        (missing_attrs == 0))
426    {
427        /* nothing's missing, a hit! */
428        gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache hit "
429                     "[%llu]\n", llu(object_ref.handle));
430        js_p->error_code = 0;
431        return SM_ACTION_COMPLETE;
432    }
433 
434    /* check if the only thing missing is the file size, then
435     * we don't need to do the object getattr operation, we only
436     * need to do the datafile getattr operation, so we return
437     * the GETATTR_NEED_DATAFILE_SIZES error code which will make
438     * the getattr-dowork state machine jump to the datafile getattr
439     * operation state.
440     */
441    if(missing_attrs == PVFS_ATTR_DATA_SIZE)
442    {
443        if(!(sm_p->getattr.attr.mask & PVFS_ATTR_META_UNSTUFFED))
444        {
445            /* We are missing the size, and we don't know for sure if the
446             * file has been unstuffed.  In this case, act as though we
447             * missed on all atributes so that we can get fresh stuffed size
448             * or datafile information as needed.
449             */
450        }
451        else
452        {
453            /* if the file size is requested but the distribution info
454             * isn't and it hasn't been cached, then we need to
455             * get that first.
456             */
457
458            js_p->error_code = GETATTR_NEED_DATAFILE_SIZES;
459            gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache hit, need sizes"
460                         "[%llu]\n", llu(object_ref.handle));
461            return SM_ACTION_COMPLETE;
462        }
463    }
464
465    /* we missed */
466    /* clean out the attributes we got from the cache; this will be
467     * overwritten when we request updated information from the server
468     */
469    PINT_free_object_attr(&sm_p->getattr.attr);
470    gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache miss due to mask: "
471        " [%llu]\n",
472          llu(object_ref.handle));
473
474    js_p->error_code = GETATTR_ACACHE_MISS;
475    return SM_ACTION_COMPLETE;
476}
477
478
479static PINT_sm_action getattr_object_getattr_setup_msgpair(
480        struct PINT_smcb *smcb, job_status_s *js_p)
481{
482    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
483    int ret = -PVFS_EINVAL;
484    PVFS_object_ref object_ref;
485    PINT_sm_msgpair_state *msg_p;
486
487    gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) %s\n", sm_p, __func__);
488
489    js_p->error_code = 0;
490
491    PINT_msgpair_init(&sm_p->msgarray_op);
492    msg_p = &sm_p->msgarray_op.msgpair;
493
494    object_ref = sm_p->getattr.object_ref;
495
496    assert(object_ref.fs_id != PVFS_FS_ID_NULL);
497    assert(object_ref.handle != PVFS_HANDLE_NULL);
498
499    /* setup the msgpair to do a getattr operation */
500    PINT_SERVREQ_GETATTR_FILL(
501        msg_p->req,
502        *sm_p->cred_p,
503        object_ref.fs_id,
504        object_ref.handle,
505        sm_p->getattr.req_attrmask,
506        sm_p->hints);
507   
508    msg_p->fs_id = object_ref.fs_id;
509    msg_p->handle = object_ref.handle;
510    msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
511    msg_p->comp_fn = getattr_object_getattr_comp_fn;
512
513    ret = PINT_cached_config_map_to_server(
514        &msg_p->svr_addr, msg_p->handle,
515        msg_p->fs_id);
516    if (ret)
517    {
518        gossip_err("Failed to map meta server address\n");
519        js_p->error_code = ret;
520    }
521
522    /* TODO: attempting to free this frame causes heap fault.
523       Make a copy
524    PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op); */
525    {
526        PINT_sm_msgarray_op *copy = malloc(sizeof(PINT_sm_msgarray_op));
527        memcpy(copy, &sm_p->msgarray_op, sizeof(PINT_sm_msgarray_op));
528        PINT_sm_push_frame(smcb, 0, copy);
529    }
530
531    return SM_ACTION_COMPLETE;
532}
533
534/*
535  copies data from getattr response into the user supplied sys_attr
536  structure.  returns 0 for directories and symlinks, and
537  GETATTR_NEED_DATAFILE_SIZES for a metafile (when appropriate)
538*/
539static int getattr_object_getattr_comp_fn(
540    void *v_p,
541    struct PVFS_server_resp *resp_p,
542    int index)
543{
544    PVFS_object_attr *attr = NULL;
545    PINT_smcb *smcb = v_p;
546    PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
547
548    assert(resp_p->op == PVFS_SERV_GETATTR);
549
550    gossip_debug(GOSSIP_GETATTR_DEBUG,
551                 "getattr_object_getattr_comp_fn called\n");
552
553    if (resp_p->status != 0)
554    {
555        return resp_p->status;
556    }
557
558    /*
559     * If we've reached the callback for the getattr msgpair tranfer,
560     * then we can make a copy of the retrieved attribute for later
561     * caching.
562     */
563    PINT_copy_object_attr(&sm_p->getattr.attr,
564                          &resp_p->u.getattr.attr);
565
566    attr =  &sm_p->getattr.attr;
567
568    /* if the ref_type mask is set to a non-zero value (!PVFS_TYPE_NONE)
569     * a -PVFS_error will be triggered if the
570     * attributes received are not one of the the types specified.
571     * This is useful so that the client can know (in some cases) that it
572     * can avoid issuing an operation to the server since the server will
573     * just pass an error back anyway.
574     */
575    if(sm_p->getattr.ref_type &&
576       sm_p->getattr.ref_type != attr->objtype)
577    {
578        int ret;
579        gossip_debug(GOSSIP_CLIENT_DEBUG, "*** "
580                     "getattr_comp_fn: Object type mismatch.\n Possibly "
581                     "saving network roundtrip by returning an error\n");
582
583        if (sm_p->getattr.ref_type == PVFS_TYPE_DIRECTORY)
584        {
585            ret = -PVFS_ENOTDIR;
586        }
587        else
588        {
589            assert(sm_p->getattr.ref_type == PVFS_TYPE_METAFILE);
590            ret = ((attr->objtype == PVFS_TYPE_DIRECTORY) ?
591                   -PVFS_EISDIR : -PVFS_EBADF);
592        }
593        PVFS_perror_gossip("Object Type mismatch error", ret);
594        return ret;
595    }
596
597    /* do assertion checking of getattr response values, and
598     * check if file sizes are needed.  With NDEBUG defined, this block
599     * only checks if file sizes are needed.
600     */
601    switch (attr->objtype)
602    {
603        case PVFS_TYPE_METAFILE:
604            if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
605                PVFS_ATTR_META_DIST)
606            {
607                /* if we requested distribution attrs, did the distribution
608                 * get set and is the size valid?
609                 */
610                assert(attr->mask & PVFS_ATTR_META_DIST);
611                assert(attr->u.meta.dist && (attr->u.meta.dist_size > 0));
612            }
613            if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
614                PVFS_ATTR_META_MIRROR_DFILES)
615            {
616                if (attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
617                {    assert(attr->u.meta.mirror_dfile_array &&
618                          (attr->u.meta.mirror_copies_count > 0));
619                     gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: Mirror handles and "
620                                                       "copy count retrieved.\n"
621                                                     ,__func__);
622                }
623                else
624                {
625                   gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: request attribute "
626                   "mask says to get the mirror dfiles, if they exist, \nbut "
627                   "the response attribute mask says they were not retrieved. "
628                   "This is okay.\n"
629                   ,__func__);
630                }
631            }
632            if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
633                PVFS_ATTR_META_DFILES)
634            {
635                /* if we requested the datafile handles for the file, did
636                 * the datafile array get populated?
637                 */
638                assert(attr->u.meta.dfile_array &&
639                       (attr->u.meta.dfile_count > 0));
640
641                gossip_debug(GOSSIP_GETATTR_DEBUG,
642                             "getattr_object_getattr_comp_fn: "
643                             "%d datafiles.\n", attr->u.meta.dfile_count);
644               
645                /* if we need the size, that should be the only time we're
646                 * going to have to do a full data file fetch.
647                 * (that's expensive)
648                 */
649                if (sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
650                {
651                    /* is the file stuffed? */
652                    if(!(attr->mask & PVFS_ATTR_META_UNSTUFFED))
653                    {
654                        /* we can compute the size without doing any more
655                         * getattr requests
656                         */
657                        gossip_debug(GOSSIP_GETATTR_DEBUG,
658                            "getattr_object_getattr_comp_fn: "
659                            "detected stuffed file.\n");
660                        return(0);
661                    }
662                    /* if caller asked for the size, then we need
663                     * to jump to the datafile_getattr state, which
664                     * will retrieve the datafile sizes for us.
665                     */
666                    return GETATTR_NEED_DATAFILE_SIZES;
667                }
668            }
669            return 0;
670        case PVFS_TYPE_DIRECTORY:
671        {
672            gossip_debug(GOSSIP_CLIENT_DEBUG,
673                "getattr comp_fn [%p] "
674                "dfile_count = %d "
675                "dist_name_len = %d "
676                "dist_params_len = %d\n",
677                attr,
678                attr->u.dir.hint.dfile_count,
679                attr->u.dir.hint.dist_name_len,
680                attr->u.dir.hint.dist_params_len);
681            return 0;
682        }
683        case PVFS_TYPE_SYMLINK:
684            return 0;
685        case PVFS_TYPE_DATAFILE:
686            return 0;
687        case PVFS_TYPE_DIRDATA:
688            return 0;
689        case PVFS_TYPE_INTERNAL:
690            return 0;
691        default:
692            gossip_err("error: getattr_object_getattr_comp_fn: "
693                       "handle refers to invalid object type\n");
694    }
695    return -PVFS_EINVAL;
696}
697
698
699static PINT_sm_action getattr_object_getattr_failure(
700        struct PINT_smcb *smcb, job_status_s *js_p)
701{
702#ifdef WIN32
703    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
704#else
705    struct PINT_client_sm *sm_p __attribute__((unused)) = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
706#endif
707
708    gossip_debug(GOSSIP_CLIENT_DEBUG
709                ,"(%p) getattr state: getattr_object_getattr_failure\n"
710                ,sm_p);
711
712    if ((js_p->error_code != -PVFS_ENOENT) &&
713        (js_p->error_code != -PVFS_EINVAL))
714    {
715        PVFS_perror_gossip("getattr_object_getattr_failure ",
716                           js_p->error_code);
717    }
718
719    return SM_ACTION_COMPLETE;
720}
721
722/* NOTE:  This nested state machine allocates and stores the results in getattr.size_array.  So,
723 * if you call this state machine directly, do not allocate space prior to calling it to avoid a
724 * nasty memory leak.
725*/
726static PINT_sm_action getattr_datafile_getattr_setup_msgpairarray(
727        struct PINT_smcb *smcb, job_status_s *js_p)
728{
729    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
730    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
731    int ret = -PVFS_EINVAL;
732    int i = 0;
733    PVFS_object_attr *attr = NULL;
734    PINT_sm_msgpair_state *msg_p = NULL;
735    uint64_t mirror_retry = (sm_p->getattr.attr.mask & PVFS_ATTR_META_MIRROR_DFILES);
736    PVFS_handle *handles;
737
738    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing getattr_datafile_getattr_setup_msgpairarray...\n");
739    gossip_debug(GOSSIP_MIRROR_DEBUG,"%s: Are we mirroring? %s\n",__func__
740                                    ,(mirror_retry ? "YES" : "NO"));
741    gossip_debug(GOSSIP_MIRROR_DEBUG,"%s: attr.mask:0x%08x \tmirror_retry:0x%08x\n"
742                ,__func__
743                ,sm_p->getattr.attr.mask
744                ,(unsigned int)mirror_retry);
745
746    js_p->error_code = 0;
747
748    attr = &sm_p->getattr.attr;
749    assert(attr);
750
751    PINT_msgpair_init(&sm_p->msgarray_op);
752    msg_p = &sm_p->msgarray_op.msgpair;
753
754    /*initialize the size_array, which will hold the size of each datahandle*/
755    PINT_SM_DATAFILE_SIZE_ARRAY_INIT(&getattr->size_array,attr->u.meta.dfile_count);
756
757    /* initialize mir_ctx_array: one context for each handle in the file */
758    getattr->mir_ctx_array = malloc(attr->u.meta.dfile_count *
759                                    sizeof(*getattr->mir_ctx_array));
760    if (!getattr->mir_ctx_array)
761    {
762        gossip_lerr("Unable to allocate mirror context array.\n");
763        js_p->error_code = -PVFS_ENOMEM;
764        return SM_ACTION_COMPLETE;
765    }
766    memset(getattr->mir_ctx_array,0,attr->u.meta.dfile_count *
767                                   sizeof(*getattr->mir_ctx_array));
768    getattr->mir_ctx_count = attr->u.meta.dfile_count;
769
770    for (i=0; i<getattr->mir_ctx_count; i++)
771    {
772       getattr->mir_ctx_array[i].original_datahandle = attr->u.meta.dfile_array[i];
773       getattr->mir_ctx_array[i].original_server_nr = i;
774    }
775
776    /* allocate handle array and populate */
777    handles = malloc(sizeof(PVFS_handle) * attr->u.meta.dfile_count);
778    if (!handles)
779    {
780        gossip_lerr("Unable to allocation local handles array.\n");
781        js_p->error_code = -PVFS_ENOMEM;
782        return SM_ACTION_COMPLETE;
783    }
784    memset(handles,0,sizeof(PVFS_handle) * attr->u.meta.dfile_count);
785    memcpy(handles,attr->u.meta.dfile_array,attr->u.meta.dfile_count *
786                                            sizeof(PVFS_handle));
787
788    /*allocate index-to-server array and populate*/
789    getattr->index_to_server = malloc(sizeof(uint32_t)*attr->u.meta.dfile_count);
790    if (!getattr->index_to_server)
791    {
792        gossip_lerr("Unable to allocate index-to-server array.\n");
793        js_p->error_code = -PVFS_ENOMEM;
794        return SM_ACTION_COMPLETE;
795    }
796    memset(getattr->index_to_server,0,sizeof(uint32_t)*attr->u.meta.dfile_count);
797    for (i=0; i<attr->u.meta.dfile_count; i++)
798    {
799        getattr->index_to_server[i] = (uint32_t)i;
800    }
801
802//START_PROFILER(getattr_prof);
803    PINT_SERVREQ_TREE_GET_FILE_SIZE_FILL(
804            msg_p->req,
805            *sm_p->cred_p,
806            sm_p->getattr.object_ref.fs_id,
807            0,
808            attr->u.meta.dfile_count,
809            handles,
810            (mirror_retry ? 1 : 0),
811            sm_p->hints);
812
813    msg_p->fs_id = sm_p->getattr.object_ref.fs_id;
814    msg_p->handle = handles[0];
815    msg_p->comp_fn = getattr_datafile_getattr_comp_fn;
816
817    ret = PINT_cached_config_map_to_server(
818        &msg_p->svr_addr, msg_p->handle, msg_p->fs_id);
819    if (ret)
820    {
821        gossip_lerr("Unable to map server address for this handle(%llu) and "
822                    "filesystem(%d)\n"
823                   ,llu(msg_p->handle)
824                   ,msg_p->fs_id);
825        js_p->error_code = ret;
826        return SM_ACTION_COMPLETE;
827    }
828
829    /* set retry flag based on mirroring option...if mirroring, we will handle
830     * retries from this machine; if not, msgpairarray will handle retries.
831    */
832    if (mirror_retry)
833    {
834        msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
835    }
836    else
837    {
838        msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
839    }
840
841
842    PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
843    return SM_ACTION_COMPLETE;
844}
845
846static int getattr_datafile_getattr_comp_fn(
847    void *v_p, struct PVFS_server_resp *resp_p, int index)
848{
849    PINT_smcb *smcb = v_p;
850    PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
851    PINT_sm_msgpair_state *msg = &(sm_p->msgarray_op.msgarray[index]);
852    struct PVFS_servreq_tree_get_file_size *tree =
853                                              &(msg->req.u.tree_get_file_size);
854
855    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
856    PINT_client_getattr_mirror_ctx *ctx = NULL;
857    uint32_t server_nr = 0;
858    int i = 0;
859
860    if (resp_p->status)
861    {   /* tree request had a problem */
862        return resp_p->status;
863    }
864
865    assert(resp_p->op == PVFS_SERV_TREE_GET_FILE_SIZE);
866
867
868    /* if we are mirroring, then we need to check the error code returned from
869     * each server. If an error is found, mirroring will try to get the size
870     * from a different server.  Below, we are marking which handles completed
871     * successfully, which tells mirroring NOT to retry them.
872    */
873    if ( getattr->attr.mask & PVFS_ATTR_META_MIRROR_DFILES) 
874    {
875       for (i=0; i<resp_p->u.tree_get_file_size.handle_count; i++)
876       {
877          if (resp_p->u.tree_get_file_size.error[i] != 0)
878          {   /* error retrieving size for this handle..we will retry it. */
879             continue;
880          }
881          server_nr = getattr->index_to_server[i];
882          sm_p->getattr.size_array[server_nr] = resp_p->u.tree_get_file_size.size[i];
883          ctx = &(getattr->mir_ctx_array[server_nr]);
884          ctx->msg_completed = 1;
885
886          /*For completed messages, update the size array with the file size
887           *just retrieved.
888          */
889          getattr->size_array[server_nr] = resp_p->u.tree_get_file_size.size[i];
890          gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: size[%d]:%lld \thandle:%llu\n"
891                                         ,__func__
892                                         ,i
893                                         ,llu(getattr->size_array[i])
894                                         ,llu(tree->handle_array[i]));
895       }/*end for*/
896    }
897    else
898    {
899     /* if we are NOT mirroring and an error is found for an individual handle,
900      * then we must invalidate the size array and return an error code.
901     */
902       for (i=0; i<resp_p->u.tree_get_file_size.handle_count; i++)
903       {
904          gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: error[%d]:%d"
905                                               "\tsize[%d]:%d\n"
906                                           ,__func__
907                                           ,i
908                                           ,(int)resp_p->u.tree_get_file_size.error[i]
909                                           ,i
910                                           ,(int)resp_p->u.tree_get_file_size.size[i]);
911
912          if (resp_p->u.tree_get_file_size.error[i] != 0)
913          {
914              gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: error[%d] is %d\n"
915                                               ,__func__
916                                               ,i
917                                               ,resp_p->u.tree_get_file_size.error[i]);
918              memset(getattr->size_array,0,sizeof(*getattr->size_array));
919              return (resp_p->u.tree_get_file_size.error[i]);
920          }
921
922          getattr->size_array[i] = resp_p->u.tree_get_file_size.size[i];
923       }/*end for*/
924
925    }/*end if*/
926
927    return(0);
928}/*end getattr_datafile_getattr_comp_fn*/
929
930static PINT_sm_action getattr_datafile_getattr_retry(
931        struct PINT_smcb *smcb, job_status_s *js_p)
932{
933    struct PINT_client_sm   *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
934    struct PVFS_object_attr *attr = &(sm_p->getattr.attr);
935    PVFS_metafile_attr *meta = &(attr->u.meta);
936    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
937    PINT_client_getattr_mirror_ctx *ctx = NULL;
938    PINT_sm_msgarray_op *mop = &(sm_p->msgarray_op);
939    PINT_sm_msgpair_state *msg = &(mop->msgarray[0]);
940    char *enc_req_bytes = NULL;
941    struct PVFS_servreq_tree_get_file_size *tree =
942                                              &(msg->req.u.tree_get_file_size);
943    uint32_t retry_msg_count = 0;
944    uint32_t index = 0;
945    uint32_t copies = 0;
946    uint32_t server_nr = 0;
947    int i   = 0;
948    int j   = 0;
949    int k   = 0;
950    int ret = 0;
951    uint32_t *tmp_server_nr;
952    PVFS_handle *tmp_handles;
953
954    gossip_debug(GOSSIP_CLIENT_DEBUG,
955                 "(%p) getattr state: "
956                 "getattr_datafile_getattr_retry\n", sm_p);
957
958    /*We only need to retry if we have mirrors; otherwise, msgpairarray
959     *has already handled the retries.
960    */
961    if (!(attr->mask & PVFS_ATTR_META_MIRROR_DFILES)) 
962    { 
963       /*we are NOT mirroring.*/
964       return SM_ACTION_COMPLETE;
965    }
966   
967    /* How many handles need to be retried? */
968    for (i=0; i<tree->num_data_files; i++)
969    {
970        server_nr = getattr->index_to_server[i];
971        ctx = &(getattr->mir_ctx_array[server_nr]);
972        if (ctx->msg_completed == 0)
973            retry_msg_count++;
974    }
975
976    /* no retries needed */
977    if (retry_msg_count == 0)
978    {
979        return SM_ACTION_COMPLETE;
980    }
981
982    /* do we have any retries available? */
983    if (getattr->retry_count >= mop->params.retry_limit)
984    {
985        /* at this point, we have msgpairs that need to be retried, but we
986         * we have met our retry limit.  so, we must invalidate the size array,
987         * since we don't have all of the necessary sizes AND return an error.
988        */
989        memset(getattr->size_array,0,sizeof(*getattr->size_array) * getattr->size);
990        js_p->error_code = (js_p->error_code ? js_p->error_code : -PVFS_ETIME);
991        gossip_err("%s: Ran out of retries(%d)\n",__func__
992                                                 ,getattr->retry_count);
993        return SM_ACTION_COMPLETE;
994    }
995
996    /*allocate temporary index-to-server array*/
997    tmp_server_nr = malloc(sizeof(uint32_t) * retry_msg_count);
998    if (!tmp_server_nr)
999    {
1000        gossip_lerr("Unable to allocate temporary index-to-server array.\n");
1001        js_p->error_code = -PVFS_ENOMEM;
1002        return SM_ACTION_COMPLETE;
1003    }
1004    memset(tmp_server_nr,0,sizeof(uint32_t) * retry_msg_count);
1005
1006    /*allocate temporary handle array*/
1007    tmp_handles = malloc(sizeof(PVFS_handle) * retry_msg_count);
1008    if (!tmp_handles)
1009    {
1010        gossip_lerr("Unable to allocate temporary handle array.\n");
1011        js_p->error_code = -PVFS_ENOMEM;
1012        return SM_ACTION_COMPLETE;
1013    }
1014    memset(tmp_handles,0,sizeof(PVFS_handle) * retry_msg_count);
1015
1016    /* okay. let's setup new handles to retry.
1017    */
1018    for (i=0,j=0; i<tree->num_data_files; i++)
1019    {
1020        server_nr = getattr->index_to_server[i];
1021        ctx = &(getattr->mir_ctx_array[server_nr]);
1022     
1023        /* don't process completed messages */
1024        if (ctx->msg_completed)
1025           continue;
1026
1027        /* for incomplete messages, cleanup memory, if necessary */
1028        enc_req_bytes = (char *)&(msg->encoded_req);
1029        for (k=0; k<sizeof(msg->encoded_req); k++)
1030        {
1031           if (enc_req_bytes[k] != '\0')
1032           {
1033              PINT_encode_release(&(msg->encoded_req),PINT_ENCODE_REQ);
1034              break;
1035           }
1036        }/*end for*/
1037
1038        if (msg->encoded_resp_p)
1039        {
1040            BMI_memfree(msg->svr_addr
1041                       ,msg->encoded_resp_p
1042                       ,msg->max_resp_sz
1043                       ,BMI_RECV);
1044        }
1045       
1046        /* Should we use the original datahandle? */
1047        if (ctx->retry_original)
1048        {
1049            ctx->retry_original = 0;
1050            tmp_handles[j]   = ctx->original_datahandle;
1051            tmp_server_nr[j] = ctx->original_server_nr;
1052            j++;
1053            continue;
1054        }/*end retry_original*/
1055
1056        /* otherwise, get next mirrored handle.  note:  if a mirrored handle is
1057         * zero, then this means that the creation of this mirrored object
1058         * failed for its particular server.  in this case, get the next valid
1059         * handle.  as a last resort, retry the original handle.
1060        */
1061        copies = ctx->current_copies_count;
1062        for (;copies < meta->mirror_copies_count; copies++)
1063        {
1064            index = (copies*meta->dfile_count) + server_nr;
1065            if (meta->mirror_dfile_array[index] != 0)
1066            {  /* we have found a valid mirrored handle */
1067               tmp_handles[j]   = meta->mirror_dfile_array[index];
1068               tmp_server_nr[j] = server_nr;
1069               j++;
1070               break;
1071            }
1072        }
1073
1074        /* if we haven't found a valid mirrored handle, retry the original
1075         * datahandle.
1076        */
1077        if ( copies == meta->mirror_copies_count )
1078        {
1079           tmp_handles[j]   = ctx->original_datahandle;
1080           tmp_server_nr[j] = ctx->original_server_nr;
1081           j++;
1082           ctx->retry_original = 0;
1083           ctx->current_copies_count = 0;
1084           getattr->retry_count++;
1085           continue;
1086        }/*end if we have to use the original*/
1087
1088        /* otherwise, setup for the next retry event */
1089        ctx->current_copies_count++;
1090        if (ctx->current_copies_count == meta->mirror_copies_count)
1091        {
1092           ctx->current_copies_count = 0;
1093           ctx->retry_original = 1;
1094           getattr->retry_count++;
1095        }
1096    }/*end for each handle in the old request*/
1097
1098    /*replace values in old tree request*/
1099    free(tree->handle_array);
1100    tree->handle_array = tmp_handles;
1101    tree->num_data_files = retry_msg_count;
1102   
1103    /*replace values in old message request*/
1104    msg->handle = tmp_handles[0];
1105    msg->svr_addr=0;
1106    ret = PINT_cached_config_map_to_server(&msg->svr_addr
1107                                          ,msg->handle
1108                                          ,msg->fs_id);
1109    if (ret)
1110    {
1111        gossip_lerr("Unable to determine server address for handle(%llu) and "
1112                    "file system(%d).\n"
1113                   ,llu(msg->handle)
1114                   ,msg->fs_id);
1115        js_p->error_code = -PVFS_EINVAL;
1116        return SM_ACTION_COMPLETE;
1117    }
1118
1119    /*save index-to-server array*/
1120    free(getattr->index_to_server);
1121    getattr->index_to_server = tmp_server_nr;
1122
1123    /* Push the msgarray_op and jump to msgpairarray.sm */
1124    PINT_sm_push_frame(smcb,0,mop);
1125    js_p->error_code=GETATTR_IO_RETRY;
1126
1127    return SM_ACTION_COMPLETE;
1128} /*end datafile_getattr_retry*/
1129
1130
1131
1132
1133
1134
1135static PINT_sm_action getattr_datafile_getattr_cleanup(
1136        struct PINT_smcb *smcb, job_status_s *js_p)
1137{
1138    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1139//FINISH_PROFILER("getattr", getattr_prof, 1);
1140    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
1141
1142    /* cleanup handle_array created for the one tree request */
1143    PINT_sm_msgpair_state *msg_p = &(sm_p->msgarray_op.msgarray[0]);
1144    if (msg_p->req.u.tree_get_file_size.handle_array)
1145       free(msg_p->req.u.tree_get_file_size.handle_array);
1146   
1147    /* cleanup tree request */
1148    PINT_msgpairarray_destroy(&sm_p->msgarray_op);
1149
1150    /* cleanup memory that may have been used for mirrored retries.*/
1151    if (getattr->mir_ctx_array)
1152       free(getattr->mir_ctx_array);
1153    if (getattr->index_to_server)
1154       free(getattr->index_to_server);
1155
1156
1157    return SM_ACTION_COMPLETE;
1158}
1159
1160static PINT_sm_action getattr_acache_insert(
1161        struct PINT_smcb *smcb, job_status_s *js_p)
1162{   
1163    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1164    PVFS_size* tmp_size = NULL;
1165
1166    if( sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE ||
1167        sm_p->getattr.attr.objtype == PVFS_TYPE_DIRECTORY ||
1168        sm_p->getattr.attr.objtype == PVFS_TYPE_SYMLINK )
1169    {
1170        /* see if we have a size value to cache */
1171        if (sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE &&
1172            sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
1173        {                                           
1174            if(!(sm_p->getattr.attr.mask & PVFS_ATTR_META_UNSTUFFED))
1175            {
1176                /* stuffed file case */
1177                sm_p->getattr.size = sm_p->getattr.attr.u.meta.stuffed_size;
1178                tmp_size = &sm_p->getattr.size;
1179                gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr_acache_insert "
1180                  "calculated stuffed logical size of %lld\n", lld(*tmp_size));
1181            }
1182            else
1183            {
1184                /* compute size as requested */
1185                assert(sm_p->getattr.attr.u.meta.dist);
1186                assert(sm_p->getattr.attr.u.meta.dist->methods &&
1187                       sm_p->getattr.attr.u.meta.dist->methods->logical_file_size);
1188
1189                sm_p->getattr.size =
1190                    (sm_p->getattr.attr.u.meta.dist->methods->logical_file_size)(
1191                    sm_p->getattr.attr.u.meta.dist->params,
1192                    sm_p->getattr.attr.u.meta.dfile_count,
1193                    sm_p->getattr.size_array);
1194
1195                tmp_size = &sm_p->getattr.size;
1196                gossip_debug(GOSSIP_GETATTR_DEBUG,"getattr_acache_insert calculated"
1197                                                  " unstuffed logical size of %lld\n"
1198                                                 , lld(*tmp_size));
1199            }
1200        }
1201
1202        PINT_acache_update(sm_p->getattr.object_ref,
1203            &sm_p->getattr.attr,
1204            tmp_size);
1205
1206        gossip_debug(GOSSIP_CLIENT_DEBUG, "trying to add object "
1207                     "reference to acache\n");
1208    }
1209
1210    return SM_ACTION_COMPLETE;
1211}
1212
1213static PINT_sm_action getattr_cleanup(
1214        struct PINT_smcb *smcb, job_status_s *js_p)
1215{
1216    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1217    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
1218
1219    gossip_debug(GOSSIP_CLIENT_DEBUG,
1220                 "(%p) getattr state: getattr_cleanup\n", sm_p);
1221
1222    gossip_debug(GOSSIP_GETATTR_DEBUG
1223                ,"%s: js_p->error_code:%d \tgetattr->attr.mask:0x%08x\n"
1224                ,__func__
1225                ,js_p->error_code
1226                ,getattr->attr.mask);
1227
1228    sm_p->error_code = js_p->error_code;
1229
1230    /* cleanup size array; is only allocated if datafile sizes are retrieved */
1231    if (getattr->size_array)
1232       free(getattr->size_array);
1233
1234    /* cleanup getattr when an error occurs */
1235    if (js_p->error_code)
1236    {
1237      if (getattr->attr.mask & PVFS_ATTR_META_DFILES)
1238      {
1239         if (getattr->attr.u.meta.dfile_array)
1240            free(getattr->attr.u.meta.dfile_array);
1241      }
1242
1243      if (getattr->attr.mask & PVFS_ATTR_META_MIRROR_DFILES)
1244      {
1245         if (getattr->attr.u.meta.mirror_dfile_array)
1246            free(getattr->attr.u.meta.mirror_dfile_array);
1247      }
1248
1249      if (getattr->attr.mask & PVFS_ATTR_META_DIST)
1250      {
1251         PINT_dist_free(getattr->attr.u.meta.dist);
1252      }
1253    }/*end if error*/
1254
1255    return SM_ACTION_COMPLETE;
1256}
1257
1258static PINT_sm_action getattr_set_sys_response(
1259        struct PINT_smcb *smcb, job_status_s *js_p)
1260{
1261    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1262    PVFS_sysresp_getattr * sysresp = NULL;
1263    PVFS_object_attr *attr = NULL;
1264
1265    if(js_p->error_code != 0)
1266    {
1267        PINT_SET_OP_COMPLETE;
1268        return SM_ACTION_TERMINATE;
1269    }
1270
1271    attr = &sm_p->getattr.attr;
1272    assert(attr);
1273   
1274    /* If we get to this state action,
1275     * the getattr state machine was invoked, so
1276     * we can assume that one of the PVFS_[i]sys_getattr functions
1277     * was called, and the response field must be filled in for the
1278     * user.
1279     */
1280
1281    sysresp = sm_p->u.getattr.getattr_resp_p;
1282   
1283    /*
1284     * if we retrieved a symlink target, copy it for the caller; this
1285     * target path will be handed all the way back up to the caller via
1286     * the PVFS_sys_attr object.  The caller of PVFS_[i]sys_getattr
1287     * must free it.
1288     */
1289    if(attr->objtype == PVFS_TYPE_SYMLINK &&
1290       attr->mask & PVFS_ATTR_SYMLNK_TARGET)
1291    {
1292        assert(attr->u.sym.target_path_len > 0);
1293        assert(attr->u.sym.target_path);
1294
1295        sysresp->attr.link_target = strdup(attr->u.sym.target_path);
1296        if (!sysresp->attr.link_target)
1297        {
1298           js_p->error_code = -PVFS_ENOMEM;
1299           PINT_SET_OP_COMPLETE;
1300           return SM_ACTION_TERMINATE;
1301        }
1302    }
1303
1304    if(attr->objtype == PVFS_TYPE_METAFILE)
1305    {
1306       /* Copy if there are any special object specific flags */
1307       sysresp->attr.flags = attr->u.meta.hint.flags;
1308       /* special case for when users ask for dfile count */
1309       if (sm_p->getattr.req_attrmask & PVFS_ATTR_META_DFILES)
1310       {
1311           sysresp->attr.dfile_count = attr->u.meta.dfile_count;
1312       }
1313       if (sm_p->getattr.req_attrmask & PVFS_ATTR_META_MIRROR_DFILES)
1314       {
1315         sysresp->attr.mirror_copies_count = attr->u.meta.mirror_copies_count;
1316       }
1317    }
1318    if (attr->objtype == PVFS_TYPE_DIRECTORY)
1319    {
1320        gossip_debug(GOSSIP_CLIENT_DEBUG, "dfile_count: %d\n",
1321            attr->u.dir.hint.dfile_count);
1322        gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_len = %d, "
1323            "dist_params_len = %d\n",
1324            attr->u.dir.hint.dist_name_len, attr->u.dir.hint.dist_params_len);
1325        sysresp->attr.dfile_count = attr->u.dir.hint.dfile_count;
1326        /*
1327         * If we retrieved any extended attributes for the directory
1328         * in question, the caller's responsibility to free it up
1329         */
1330        if (attr->u.dir.hint.dist_name_len > 0 &&
1331            (sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_HINT))
1332        {
1333            sysresp->attr.dist_name = strdup(attr->u.dir.hint.dist_name);
1334            if (!sysresp->attr.dist_name)
1335            {
1336                js_p->error_code = -PVFS_ENOMEM;
1337                PINT_SET_OP_COMPLETE;
1338                return SM_ACTION_TERMINATE;
1339            }
1340            gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_hint: %s\n"
1341                                            , sysresp->attr.dist_name);
1342        }
1343        if (attr->u.dir.hint.dist_params_len > 0 &&
1344            (sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_HINT))
1345        {
1346            sysresp->attr.dist_params = strdup(attr->u.dir.hint.dist_params);
1347            if (!sysresp->attr.dist_params)
1348            {
1349                free(sysresp->attr.dist_name);
1350                sysresp->attr.dist_name = NULL;
1351                js_p->error_code = -PVFS_ENOMEM;
1352                PINT_SET_OP_COMPLETE;
1353                return SM_ACTION_TERMINATE;
1354            }
1355            gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_params: %s\n"
1356                                            , sysresp->attr.dist_params);
1357        }
1358    }
1359
1360    /* copy outgoing sys_attr fields from returned object_attr */
1361    sysresp->attr.owner = attr->owner;
1362    sysresp->attr.group = attr->group;
1363    sysresp->attr.perms = attr->perms;
1364    sysresp->attr.atime = attr->atime;
1365    sysresp->attr.mtime = attr->mtime;
1366    sysresp->attr.ctime = attr->ctime;
1367    sysresp->attr.mask  = PVFS_util_object_to_sys_attr_mask(attr->mask);
1368    sysresp->attr.size  = 0;
1369    sysresp->attr.objtype = attr->objtype;
1370
1371    if (js_p->error_code == 0)
1372    {
1373        /* convert outgoing attribute mask based on what we got */
1374        sysresp->attr.mask = PVFS_util_object_to_sys_attr_mask(
1375            sm_p->getattr.attr.mask);
1376
1377       if (sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
1378        {
1379            if( attr->objtype == PVFS_TYPE_DATAFILE )
1380            {
1381                sysresp->attr.size = attr->u.data.size;
1382            }
1383            else
1384            {
1385                sysresp->attr.size = sm_p->getattr.size;
1386            }
1387
1388            sysresp->attr.mask |= PVFS_ATTR_SYS_SIZE;
1389        }
1390
1391        if(attr->mask & PVFS_ATTR_META_DIST)
1392        {
1393            /* we have enough information to set a block size */
1394            sysresp->attr.blksize = attr->u.meta.dist->methods->get_blksize(
1395                attr->u.meta.dist->params);
1396            sysresp->attr.mask |= PVFS_ATTR_SYS_BLKSIZE;
1397        }
1398
1399        /* if this is a symlink, add the link target */
1400        if (sm_p->getattr.req_attrmask & PVFS_ATTR_SYMLNK_TARGET)
1401        {
1402            sysresp->attr.mask |= PVFS_ATTR_SYS_LNK_TARGET;
1403        }
1404
1405        if(sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_DIRENT_COUNT)
1406        {
1407            sysresp->attr.dirent_count = attr->u.dir.dirent_count;
1408            sysresp->attr.mask |= PVFS_ATTR_SYS_DIRENT_COUNT;
1409        }
1410    }
1411    else
1412    {
1413        /* in case of failure, blank out response */
1414        memset(sm_p->u.getattr.getattr_resp_p,
1415               0, sizeof(PVFS_sysresp_getattr));
1416    }
1417
1418    PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
1419
1420    PINT_SET_OP_COMPLETE;
1421    return SM_ACTION_TERMINATE;
1422}
1423
1424/*
1425 * Local variables:
1426 *  mode: c
1427 *  c-indent-level: 4
1428 *  c-basic-offset: 4
1429 * End:
1430 *
1431 * vim: ft=c ts=8 sts=4 sw=4 expandtab
1432 */
Note: See TracBrowser for help on using the browser.