root/branches/windows-client/src/client/sysint/sys-getattr.sm @ 8638

Revision 8638, 46.9 KB (checked in by sampson, 2 years ago)

Testing client

Line 
1/*
2 * (C) 2003 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/** \file
8 *  \ingroup sysint
9 *
10 *  PVFS2 system interface routines for obtaining attributes of an object
11 *  (file or directory).
12 */
13#include <string.h>
14#include <assert.h>
15
16#include "client-state-machine.h"
17#include "pvfs2-debug.h"
18#include "job.h"
19#include "gossip.h"
20#include "str-utils.h"
21#include "pint-util.h"
22#include "pvfs2-util.h"
23#include "pint-cached-config.h"
24#include "PINT-reqproto-encode.h"
25#include "pvfs2-internal.h"
26#include "pvfs2-types-debug.h"
27
28/* pvfs2_client_getattr_sm
29 *
30 * The sm_p->msgpair structure is used to get the attributes of the
31 * object itself.  We convert the original attribute mask (in
32 * sm_p->u.getattr.attrmask) to ask for datafile and distribution info
33 * if the user asked for file size (PVFS_ATTR_SYS_SIZE).  This allows
34 * us to obtain this information (if the object turns out to be a
35 * metafile) so that we can later look up the datafile sizes and
36 * calculate the overall file size.
37 *
38 * The sm_p->msgpairarray is used to get datafile sizes, if it turns
39 * out that we need them.  This space will also need to be freed, if
40 * we grab these sizes.
41 */
42
43#ifdef WIN32
44static struct profiler getattr_prof;
45#else
46static struct profiler getattr_prof __attribute__((unused));
47#endif
48
49extern job_context_id pint_client_sm_context;
50
51enum
52{
53    GETATTR_ACACHE_MISS = 1,
54    GETATTR_NEED_DATAFILE_SIZES = 2,
55    GETATTR_IO_RETRY = 3
56};
57
58/* completion function prototypes */
59static int getattr_object_getattr_comp_fn(
60    void *v_p, struct PVFS_server_resp *resp_p, int index);
61static int getattr_datafile_getattr_comp_fn(
62    void *v_p, struct PVFS_server_resp *resp_p, int index);
63
64%%
65
66nested machine pvfs2_client_datafile_getattr_sizes_sm
67{
68    state datafile_getattr_setup_msgpairarray
69    {
70        run getattr_datafile_getattr_setup_msgpairarray;
71        success => datafile_getattr_xfer_msgpairarray;
72        default => datafile_getattr_cleanup;
73    }
74   
75    state datafile_getattr_xfer_msgpairarray
76    {
77        jump pvfs2_msgpairarray_sm;
78        default => datafile_getattr_retry;
79    }
80
81    state datafile_getattr_retry
82    {
83        run getattr_datafile_getattr_retry;
84        GETATTR_IO_RETRY => datafile_getattr_xfer_msgpairarray;
85        default => datafile_getattr_cleanup;
86    }
87
88    state datafile_getattr_cleanup
89    {
90        run getattr_datafile_getattr_cleanup;
91        default => return;
92    }
93}
94
95
96nested machine pvfs2_client_getattr_sm
97{
98    state acache_lookup
99    {
100        run getattr_acache_lookup;
101        GETATTR_ACACHE_MISS => object_getattr_setup_msgpair;
102        GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
103        default => cleanup;
104    }
105
106    state object_getattr_setup_msgpair
107    {
108        run getattr_object_getattr_setup_msgpair;
109        success => object_getattr_xfer_msgpair;
110        default => cleanup;
111    }
112
113    state object_getattr_xfer_msgpair
114    {
115        jump pvfs2_msgpairarray_sm;
116        success => acache_insert;
117        GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
118        default => object_getattr_failure;
119    }
120
121    state acache_insert
122    {
123        run getattr_acache_insert;
124        default => cleanup;
125    }
126
127    state object_getattr_failure
128    {
129        run getattr_object_getattr_failure;
130        default => cleanup;
131    }
132
133    state datafile_get_sizes
134    {
135        jump pvfs2_client_datafile_getattr_sizes_sm;
136        success => acache_insert;
137        default => cleanup;
138    }
139
140    state cleanup
141    {
142        run getattr_cleanup;
143        default => return;
144    }
145}
146
147machine pvfs2_client_sysint_getattr_sm
148{
149    state dowork
150    {
151        jump pvfs2_client_getattr_sm;
152        default => set_sys_response;
153    }
154
155    state set_sys_response
156    {
157        run getattr_set_sys_response;
158        default => terminate;
159    }
160}
161   
162%%
163
164
165/** Initiate retrieval of object attributes.
166 */
167PVFS_error PVFS_isys_getattr(
168    PVFS_object_ref ref,
169    uint32_t attrmask,
170    const PVFS_credentials *credentials,
171    PVFS_sysresp_getattr *resp_p,
172    PVFS_sys_op_id *op_id,
173    PVFS_hint hints,
174    void *user_ptr)
175{
176    PVFS_error ret = -PVFS_EINVAL;
177    PINT_smcb *smcb = NULL;
178    PINT_client_sm *sm_p = NULL;
179
180    gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_getattr entered\n");
181
182    if ((ref.handle == PVFS_HANDLE_NULL) ||
183        (ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
184    {
185        gossip_err("invalid (NULL) required argument\n");
186        return ret;
187    }
188   
189    if (attrmask & ~(PVFS_ATTR_SYS_ALL))
190    {
191        gossip_err("invalid attrmask\n");
192        return ret;
193    }
194
195    PINT_smcb_alloc(&smcb, PVFS_SYS_GETATTR,
196            sizeof(struct PINT_client_sm),
197            client_op_state_get_machine,
198            client_state_machine_terminate,
199            pint_client_sm_context);
200    if (smcb == NULL)
201    {
202        return -PVFS_ENOMEM;
203    }
204    sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
205
206    PINT_init_msgarray_params(sm_p, ref.fs_id);
207    PINT_init_sysint_credentials(sm_p->cred_p, credentials);
208    sm_p->error_code = 0;
209    sm_p->object_ref = ref;
210    sm_p->u.getattr.getattr_resp_p = resp_p;
211    PVFS_hint_copy(hints, &sm_p->hints);
212    PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle)
213                 ,&ref.handle);
214   
215    PINT_SM_GETATTR_STATE_FILL(
216        sm_p->getattr,
217        ref,
218        PVFS_util_sys_to_object_attr_mask(
219            attrmask),
220        PVFS_TYPE_NONE,
221        0);
222
223    return PINT_client_state_machine_post(
224        smcb,  op_id, user_ptr);
225}
226
227/** Retrieve object attributes.
228 */
229PVFS_error PVFS_sys_getattr(
230    PVFS_object_ref ref,
231    uint32_t attrmask,
232    const PVFS_credentials *credentials,
233    PVFS_sysresp_getattr *resp_p,
234    PVFS_hint hints)
235{
236    PVFS_error ret, error;
237    PVFS_sys_op_id op_id;
238
239    gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_getattr entered\n");
240
241    ret = PVFS_isys_getattr(ref, attrmask, credentials,
242                            resp_p, &op_id, hints, NULL);
243    if (ret)
244    {
245        PVFS_perror_gossip("PVFS_isys_getattr call", ret);
246        return ret;
247    }
248
249    if(op_id != -1)
250    {
251        /* did not complete immediately, so we wait */
252
253        ret = PVFS_sys_wait(op_id, "getattr", &error);
254        if (ret)
255        {
256            PVFS_perror_gossip("PVFS_sys_wait call", ret);
257        }
258        if(error)
259        {
260            ret = error;
261        }
262        PINT_sys_release(op_id);
263    }
264
265    return ret;
266}
267
268
269/**
270 * getattr_acache_lookup
271 * @ingroup client_sm_getattr
272 *
273 * This function is invoked as the first state action of the
274 * getattr-dowork state machine.  It performs a lookup into the
275 * attribute cache for the attribute in question, and returns
276 * result codes for a hit or a miss. 
277 *
278 * @param smcb This must be a valid client state machine handle, with
279 * the @ref getattr field containing valid values for the
280 * fsid/handle of the desired attribute (in object_ref), as well as the
281 * requested attribute mask (req_attrmask).
282 *
283 * @param js_p Contains the return code to be set by this function in the
284 * @ref error_code field.  This determines the next state action to jump
285 * to in the getattr-dowork state machine.  Possible values for js_p->error_code
286 * are:
287 * <ul>
288 * <li><b>GETATTR_ACACHE_MISS</b> - The requested attribute was not found
289 * in the attribute cache, or the attribute was found, but the attribute's
290 * mask did not include values required by the requested mask (req_attrmask).
291 * </li>
292 * <li><b>GETATTR_NEED_DATAFILE_SIZES</b> - The requested attribute was found
293 * in the attribute cache and the mask was sufficient, but the data file size
294 * was requested for this handle, so we need to get that next.
295 * </li>
296 * <li><b>default</b> - The requested attribute was found in the attribute
297 * cache and its mask values were sufficient.
298 * </li>
299 *
300 * @return This function should always return 1 unless an error occurred
301 * within the internals of the state machine.
302 */
303static PINT_sm_action getattr_acache_lookup(
304        struct PINT_smcb *smcb, job_status_s *js_p)
305{
306    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
307    uint32_t trimmed_mask = 0;
308    int missing_attrs;
309    PVFS_object_ref object_ref;
310    int ret = -1;
311    int attr_status = -1;
312    int size_status = -1;
313
314    js_p->error_code = 0;
315
316    object_ref = sm_p->getattr.object_ref;
317
318    assert(object_ref.handle != PVFS_HANDLE_NULL);
319    assert(object_ref.fs_id != PVFS_FS_ID_NULL);
320
321    gossip_debug(GOSSIP_ACACHE_DEBUG, "%s: handle %llu fsid %d\n",
322      __func__, llu(object_ref.handle), object_ref.fs_id);
323
324   
325    /* The sys attr mask request is converted to object
326     * attr mask values for comparison with the cached
327     */
328    if(sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
329    {
330        sm_p->getattr.req_attrmask |= PVFS_ATTR_META_ALL;
331    }
332
333    if(sm_p->getattr.flags & PINT_SM_GETATTR_BYPASS_CACHE)
334    {
335        gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: forced acache miss: "
336                    " [%llu]\n",
337                      llu(object_ref.handle));
338        js_p->error_code = GETATTR_ACACHE_MISS;
339        return SM_ACTION_COMPLETE;
340    }
341
342    ret = PINT_acache_get_cached_entry(object_ref,
343        &sm_p->getattr.attr,
344        &attr_status,
345        &sm_p->getattr.size,
346        &size_status);
347    if(ret < 0 || attr_status < 0)
348    {
349        gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: clean acache miss: "
350                    " [%llu]\n",
351                      llu(object_ref.handle));
352 
353        js_p->error_code = GETATTR_ACACHE_MISS;
354        return SM_ACTION_COMPLETE;
355    }
356
357    /* acache hit, check results */
358 
359    /* The sys attr mask request is converted to object
360     * attr mask values for comparison with the cached
361     * entry
362     */
363    trimmed_mask = sm_p->getattr.req_attrmask;
364
365    gossip_debug(GOSSIP_GETATTR_DEBUG,"request attrmask:\n");
366    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,sm_p->getattr.req_attrmask);
367
368    gossip_debug(GOSSIP_GETATTR_DEBUG,"trimmed attrmask:\n");
369    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,trimmed_mask);
370
371    gossip_debug(GOSSIP_GETATTR_DEBUG,"returned attrmask:\n");
372    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,sm_p->getattr.attr.mask);
373
374    /* the trimmed mask is used for making sure that we're only
375     * checking attr bits that make sense for the object type
376     * since the caller may have requested all attributes in
377     * the case where it doesn't know what type of object we're
378     * doing the getattr against.
379     */
380    if (sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE)
381    {
382        trimmed_mask &= (PVFS_ATTR_META_ALL |
383                         PVFS_ATTR_META_UNSTUFFED |
384                         PVFS_ATTR_DATA_SIZE |
385                         PVFS_ATTR_COMMON_ALL);
386    }
387    else if (sm_p->getattr.attr.objtype == PVFS_TYPE_SYMLINK)
388    {
389        trimmed_mask &= (PVFS_ATTR_SYMLNK_ALL | PVFS_ATTR_COMMON_ALL);
390    }
391    else if (sm_p->getattr.attr.objtype == PVFS_TYPE_DIRECTORY)
392    {
393        trimmed_mask &= (PVFS_ATTR_COMMON_ALL | PVFS_ATTR_DIR_ALL);
394    }
395 
396    /* trimmed_mask contains the list of attributes
397     * requested for a particular object,
398     * while sm_p->getattr.attr.mask contains
399     * the list of attributes cached for that object.
400     * The cached attributes can be used if requested
401     * is less than cached, i.e. all the attributes
402     * we need are already cached.  So we need to do
403     * a bitwise comparison of requested <= cached.
404     *
405     * xor of the two masks gives us the bits that are different,
406     * and-ing that result with the requested mask gives us the
407     * bits in the requested mask but not in the cached mask.
408     */
409    missing_attrs = ((trimmed_mask ^ sm_p->getattr.attr.mask) &
410                     trimmed_mask);
411
412    gossip_debug(GOSSIP_GETATTR_DEBUG,"missing attrmask BEFORE:\n");
413    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,missing_attrs);
414
415    if ( missing_attrs & PVFS_ATTR_META_MIRROR_DFILES )
416    {
417       /*Mirroring is optional, so remove mirror-dfiles*/
418         missing_attrs &= ~PVFS_ATTR_META_MIRROR_DFILES;   
419    }
420         
421    gossip_debug(GOSSIP_GETATTR_DEBUG,"missing attrmask AFTER:\n");
422    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,missing_attrs);
423
424    if((missing_attrs == PVFS_ATTR_DATA_SIZE && size_status == 0) ||
425        (missing_attrs == 0))
426    {
427        /* nothing's missing, a hit! */
428        gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache hit "
429                     "[%llu]\n", llu(object_ref.handle));
430        js_p->error_code = 0;
431        return SM_ACTION_COMPLETE;
432    }
433 
434    /* check if the only thing missing is the file size, then
435     * we don't need to do the object getattr operation, we only
436     * need to do the datafile getattr operation, so we return
437     * the GETATTR_NEED_DATAFILE_SIZES error code which will make
438     * the getattr-dowork state machine jump to the datafile getattr
439     * operation state.
440     */
441    if(missing_attrs == PVFS_ATTR_DATA_SIZE)
442    {
443        if(!(sm_p->getattr.attr.mask & PVFS_ATTR_META_UNSTUFFED))
444        {
445            /* We are missing the size, and we don't know for sure if the
446             * file has been unstuffed.  In this case, act as though we
447             * missed on all atributes so that we can get fresh stuffed size
448             * or datafile information as needed.
449             */
450        }
451        else
452        {
453            /* if the file size is requested but the distribution info
454             * isn't and it hasn't been cached, then we need to
455             * get that first.
456             */
457
458            js_p->error_code = GETATTR_NEED_DATAFILE_SIZES;
459            gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache hit, need sizes"
460                         "[%llu]\n", llu(object_ref.handle));
461            return SM_ACTION_COMPLETE;
462        }
463    }
464
465    /* we missed */
466    /* clean out the attributes we got from the cache; this will be
467     * overwritten when we request updated information from the server
468     */
469    PINT_free_object_attr(&sm_p->getattr.attr);
470    gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache miss due to mask: "
471        " [%llu]\n",
472          llu(object_ref.handle));
473
474    js_p->error_code = GETATTR_ACACHE_MISS;
475    return SM_ACTION_COMPLETE;
476}
477
478
479static PINT_sm_action getattr_object_getattr_setup_msgpair(
480        struct PINT_smcb *smcb, job_status_s *js_p)
481{
482    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
483    int ret = -PVFS_EINVAL;
484    PVFS_object_ref object_ref;
485    PINT_sm_msgpair_state *msg_p;
486
487    gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) %s\n", sm_p, __func__);
488
489    js_p->error_code = 0;
490
491    PINT_msgpair_init(&sm_p->msgarray_op);
492    msg_p = &sm_p->msgarray_op.msgpair;
493
494    object_ref = sm_p->getattr.object_ref;
495
496    assert(object_ref.fs_id != PVFS_FS_ID_NULL);
497    assert(object_ref.handle != PVFS_HANDLE_NULL);
498
499    /* setup the msgpair to do a getattr operation */
500    PINT_SERVREQ_GETATTR_FILL(
501        msg_p->req,
502        *sm_p->cred_p,
503        object_ref.fs_id,
504        object_ref.handle,
505        sm_p->getattr.req_attrmask,
506        sm_p->hints);
507   
508    msg_p->fs_id = object_ref.fs_id;
509    msg_p->handle = object_ref.handle;
510    msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
511    msg_p->comp_fn = getattr_object_getattr_comp_fn;
512
513    ret = PINT_cached_config_map_to_server(
514        &msg_p->svr_addr, msg_p->handle,
515        msg_p->fs_id);
516    if (ret)
517    {
518        gossip_err("Failed to map meta server address\n");
519        js_p->error_code = ret;
520    }
521
522    /* TODO: attempting to free this frame causes heap fault.
523       Make a copy */
524    PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
525    /*
526    {
527        PINT_sm_msgarray_op *copy = malloc(sizeof(PINT_sm_msgarray_op));
528        memcpy(copy, &sm_p->msgarray_op, sizeof(PINT_sm_msgarray_op));
529        PINT_sm_push_frame(smcb, 0, copy);
530    }
531    */
532
533    return SM_ACTION_COMPLETE;
534}
535
536/*
537  copies data from getattr response into the user supplied sys_attr
538  structure.  returns 0 for directories and symlinks, and
539  GETATTR_NEED_DATAFILE_SIZES for a metafile (when appropriate)
540*/
541static int getattr_object_getattr_comp_fn(
542    void *v_p,
543    struct PVFS_server_resp *resp_p,
544    int index)
545{
546    PVFS_object_attr *attr = NULL;
547    PINT_smcb *smcb = v_p;
548    PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
549
550    assert(resp_p->op == PVFS_SERV_GETATTR);
551
552    gossip_debug(GOSSIP_GETATTR_DEBUG,
553                 "getattr_object_getattr_comp_fn called\n");
554
555    if (resp_p->status != 0)
556    {
557        return resp_p->status;
558    }
559
560    /*
561     * If we've reached the callback for the getattr msgpair tranfer,
562     * then we can make a copy of the retrieved attribute for later
563     * caching.
564     */
565    PINT_copy_object_attr(&sm_p->getattr.attr,
566                          &resp_p->u.getattr.attr);
567
568    attr =  &sm_p->getattr.attr;
569
570    /* if the ref_type mask is set to a non-zero value (!PVFS_TYPE_NONE)
571     * a -PVFS_error will be triggered if the
572     * attributes received are not one of the the types specified.
573     * This is useful so that the client can know (in some cases) that it
574     * can avoid issuing an operation to the server since the server will
575     * just pass an error back anyway.
576     */
577    if(sm_p->getattr.ref_type &&
578       sm_p->getattr.ref_type != attr->objtype)
579    {
580        int ret;
581        gossip_debug(GOSSIP_CLIENT_DEBUG, "*** "
582                     "getattr_comp_fn: Object type mismatch.\n Possibly "
583                     "saving network roundtrip by returning an error\n");
584
585        if (sm_p->getattr.ref_type == PVFS_TYPE_DIRECTORY)
586        {
587            ret = -PVFS_ENOTDIR;
588        }
589        else
590        {
591            assert(sm_p->getattr.ref_type == PVFS_TYPE_METAFILE);
592            ret = ((attr->objtype == PVFS_TYPE_DIRECTORY) ?
593                   -PVFS_EISDIR : -PVFS_EBADF);
594        }
595        PVFS_perror_gossip("Object Type mismatch error", ret);
596        return ret;
597    }
598
599    /* do assertion checking of getattr response values, and
600     * check if file sizes are needed.  With NDEBUG defined, this block
601     * only checks if file sizes are needed.
602     */
603    switch (attr->objtype)
604    {
605        case PVFS_TYPE_METAFILE:
606            if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
607                PVFS_ATTR_META_DIST)
608            {
609                /* if we requested distribution attrs, did the distribution
610                 * get set and is the size valid?
611                 */
612                assert(attr->mask & PVFS_ATTR_META_DIST);
613                assert(attr->u.meta.dist && (attr->u.meta.dist_size > 0));
614            }
615            if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
616                PVFS_ATTR_META_MIRROR_DFILES)
617            {
618                if (attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
619                {    assert(attr->u.meta.mirror_dfile_array &&
620                          (attr->u.meta.mirror_copies_count > 0));
621                     gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: Mirror handles and "
622                                                       "copy count retrieved.\n"
623                                                     ,__func__);
624                }
625                else
626                {
627                   gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: request attribute "
628                   "mask says to get the mirror dfiles, if they exist, \nbut "
629                   "the response attribute mask says they were not retrieved. "
630                   "This is okay.\n"
631                   ,__func__);
632                }
633            }
634            if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
635                PVFS_ATTR_META_DFILES)
636            {
637                /* if we requested the datafile handles for the file, did
638                 * the datafile array get populated?
639                 */
640                assert(attr->u.meta.dfile_array &&
641                       (attr->u.meta.dfile_count > 0));
642
643                gossip_debug(GOSSIP_GETATTR_DEBUG,
644                             "getattr_object_getattr_comp_fn: "
645                             "%d datafiles.\n", attr->u.meta.dfile_count);
646               
647                /* if we need the size, that should be the only time we're
648                 * going to have to do a full data file fetch.
649                 * (that's expensive)
650                 */
651                if (sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
652                {
653                    /* is the file stuffed? */
654                    if(!(attr->mask & PVFS_ATTR_META_UNSTUFFED))
655                    {
656                        /* we can compute the size without doing any more
657                         * getattr requests
658                         */
659                        gossip_debug(GOSSIP_GETATTR_DEBUG,
660                            "getattr_object_getattr_comp_fn: "
661                            "detected stuffed file.\n");
662                        return(0);
663                    }
664                    /* if caller asked for the size, then we need
665                     * to jump to the datafile_getattr state, which
666                     * will retrieve the datafile sizes for us.
667                     */
668                    return GETATTR_NEED_DATAFILE_SIZES;
669                }
670            }
671            return 0;
672        case PVFS_TYPE_DIRECTORY:
673        {
674            gossip_debug(GOSSIP_CLIENT_DEBUG,
675                "getattr comp_fn [%p] "
676                "dfile_count = %d "
677                "dist_name_len = %d "
678                "dist_params_len = %d\n",
679                attr,
680                attr->u.dir.hint.dfile_count,
681                attr->u.dir.hint.dist_name_len,
682                attr->u.dir.hint.dist_params_len);
683            return 0;
684        }
685        case PVFS_TYPE_SYMLINK:
686            return 0;
687        case PVFS_TYPE_DATAFILE:
688            return 0;
689        case PVFS_TYPE_DIRDATA:
690            return 0;
691        case PVFS_TYPE_INTERNAL:
692            return 0;
693        default:
694            gossip_err("error: getattr_object_getattr_comp_fn: "
695                       "handle refers to invalid object type\n");
696    }
697    return -PVFS_EINVAL;
698}
699
700
701static PINT_sm_action getattr_object_getattr_failure(
702        struct PINT_smcb *smcb, job_status_s *js_p)
703{
704#ifdef WIN32
705    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
706#else
707    struct PINT_client_sm *sm_p __attribute__((unused)) = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
708#endif
709
710    gossip_debug(GOSSIP_CLIENT_DEBUG
711                ,"(%p) getattr state: getattr_object_getattr_failure\n"
712                ,sm_p);
713
714    if ((js_p->error_code != -PVFS_ENOENT) &&
715        (js_p->error_code != -PVFS_EINVAL))
716    {
717        PVFS_perror_gossip("getattr_object_getattr_failure ",
718                           js_p->error_code);
719    }
720
721    return SM_ACTION_COMPLETE;
722}
723
724/* NOTE:  This nested state machine allocates and stores the results in getattr.size_array.  So,
725 * if you call this state machine directly, do not allocate space prior to calling it to avoid a
726 * nasty memory leak.
727*/
728static PINT_sm_action getattr_datafile_getattr_setup_msgpairarray(
729        struct PINT_smcb *smcb, job_status_s *js_p)
730{
731    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
732    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
733    int ret = -PVFS_EINVAL;
734    int i = 0;
735    PVFS_object_attr *attr = NULL;
736    PINT_sm_msgpair_state *msg_p = NULL;
737    uint64_t mirror_retry = (sm_p->getattr.attr.mask & PVFS_ATTR_META_MIRROR_DFILES);
738    PVFS_handle *handles;
739
740    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing getattr_datafile_getattr_setup_msgpairarray...\n");
741    gossip_debug(GOSSIP_MIRROR_DEBUG,"%s: Are we mirroring? %s\n",__func__
742                                    ,(mirror_retry ? "YES" : "NO"));
743    gossip_debug(GOSSIP_MIRROR_DEBUG,"%s: attr.mask:0x%08x \tmirror_retry:0x%08x\n"
744                ,__func__
745                ,sm_p->getattr.attr.mask
746                ,(unsigned int)mirror_retry);
747
748    js_p->error_code = 0;
749
750    attr = &sm_p->getattr.attr;
751    assert(attr);
752
753    PINT_msgpair_init(&sm_p->msgarray_op);
754    msg_p = &sm_p->msgarray_op.msgpair;
755
756    /*initialize the size_array, which will hold the size of each datahandle*/
757    PINT_SM_DATAFILE_SIZE_ARRAY_INIT(&getattr->size_array,attr->u.meta.dfile_count);
758
759    /* initialize mir_ctx_array: one context for each handle in the file */
760    getattr->mir_ctx_array = malloc(attr->u.meta.dfile_count *
761                                    sizeof(*getattr->mir_ctx_array));
762    if (!getattr->mir_ctx_array)
763    {
764        gossip_lerr("Unable to allocate mirror context array.\n");
765        js_p->error_code = -PVFS_ENOMEM;
766        return SM_ACTION_COMPLETE;
767    }
768    memset(getattr->mir_ctx_array,0,attr->u.meta.dfile_count *
769                                   sizeof(*getattr->mir_ctx_array));
770    getattr->mir_ctx_count = attr->u.meta.dfile_count;
771
772    for (i=0; i<getattr->mir_ctx_count; i++)
773    {
774       getattr->mir_ctx_array[i].original_datahandle = attr->u.meta.dfile_array[i];
775       getattr->mir_ctx_array[i].original_server_nr = i;
776    }
777
778    /* allocate handle array and populate */
779    handles = malloc(sizeof(PVFS_handle) * attr->u.meta.dfile_count);
780    if (!handles)
781    {
782        gossip_lerr("Unable to allocation local handles array.\n");
783        js_p->error_code = -PVFS_ENOMEM;
784        return SM_ACTION_COMPLETE;
785    }
786    memset(handles,0,sizeof(PVFS_handle) * attr->u.meta.dfile_count);
787    memcpy(handles,attr->u.meta.dfile_array,attr->u.meta.dfile_count *
788                                            sizeof(PVFS_handle));
789
790    /*allocate index-to-server array and populate*/
791    getattr->index_to_server = malloc(sizeof(uint32_t)*attr->u.meta.dfile_count);
792    if (!getattr->index_to_server)
793    {
794        gossip_lerr("Unable to allocate index-to-server array.\n");
795        js_p->error_code = -PVFS_ENOMEM;
796        return SM_ACTION_COMPLETE;
797    }
798    memset(getattr->index_to_server,0,sizeof(uint32_t)*attr->u.meta.dfile_count);
799    for (i=0; i<attr->u.meta.dfile_count; i++)
800    {
801        getattr->index_to_server[i] = (uint32_t)i;
802    }
803
804//START_PROFILER(getattr_prof);
805    PINT_SERVREQ_TREE_GET_FILE_SIZE_FILL(
806            msg_p->req,
807            *sm_p->cred_p,
808            sm_p->getattr.object_ref.fs_id,
809            0,
810            attr->u.meta.dfile_count,
811            handles,
812            (mirror_retry ? 1 : 0),
813            sm_p->hints);
814
815    msg_p->fs_id = sm_p->getattr.object_ref.fs_id;
816    msg_p->handle = handles[0];
817    msg_p->comp_fn = getattr_datafile_getattr_comp_fn;
818
819    ret = PINT_cached_config_map_to_server(
820        &msg_p->svr_addr, msg_p->handle, msg_p->fs_id);
821    if (ret)
822    {
823        gossip_lerr("Unable to map server address for this handle(%llu) and "
824                    "filesystem(%d)\n"
825                   ,llu(msg_p->handle)
826                   ,msg_p->fs_id);
827        js_p->error_code = ret;
828        return SM_ACTION_COMPLETE;
829    }
830
831    /* set retry flag based on mirroring option...if mirroring, we will handle
832     * retries from this machine; if not, msgpairarray will handle retries.
833    */
834    if (mirror_retry)
835    {
836        msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
837    }
838    else
839    {
840        msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
841    }
842
843
844    PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
845    return SM_ACTION_COMPLETE;
846}
847
848static int getattr_datafile_getattr_comp_fn(
849    void *v_p, struct PVFS_server_resp *resp_p, int index)
850{
851    PINT_smcb *smcb = v_p;
852    PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
853    PINT_sm_msgpair_state *msg = &(sm_p->msgarray_op.msgarray[index]);
854    struct PVFS_servreq_tree_get_file_size *tree =
855                                              &(msg->req.u.tree_get_file_size);
856
857    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
858    PINT_client_getattr_mirror_ctx *ctx = NULL;
859    uint32_t server_nr = 0;
860    int i = 0;
861
862    if (resp_p->status)
863    {   /* tree request had a problem */
864        return resp_p->status;
865    }
866
867    assert(resp_p->op == PVFS_SERV_TREE_GET_FILE_SIZE);
868
869
870    /* if we are mirroring, then we need to check the error code returned from
871     * each server. If an error is found, mirroring will try to get the size
872     * from a different server.  Below, we are marking which handles completed
873     * successfully, which tells mirroring NOT to retry them.
874    */
875    if ( getattr->attr.mask & PVFS_ATTR_META_MIRROR_DFILES) 
876    {
877       for (i=0; i<resp_p->u.tree_get_file_size.handle_count; i++)
878       {
879          if (resp_p->u.tree_get_file_size.error[i] != 0)
880          {   /* error retrieving size for this handle..we will retry it. */
881             continue;
882          }
883          server_nr = getattr->index_to_server[i];
884          sm_p->getattr.size_array[server_nr] = resp_p->u.tree_get_file_size.size[i];
885          ctx = &(getattr->mir_ctx_array[server_nr]);
886          ctx->msg_completed = 1;
887
888          /*For completed messages, update the size array with the file size
889           *just retrieved.
890          */
891          getattr->size_array[server_nr] = resp_p->u.tree_get_file_size.size[i];
892          gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: size[%d]:%lld \thandle:%llu\n"
893                                         ,__func__
894                                         ,i
895                                         ,llu(getattr->size_array[i])
896                                         ,llu(tree->handle_array[i]));
897       }/*end for*/
898    }
899    else
900    {
901     /* if we are NOT mirroring and an error is found for an individual handle,
902      * then we must invalidate the size array and return an error code.
903     */
904       for (i=0; i<resp_p->u.tree_get_file_size.handle_count; i++)
905       {
906          gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: error[%d]:%d"
907                                               "\tsize[%d]:%d\n"
908                                           ,__func__
909                                           ,i
910                                           ,(int)resp_p->u.tree_get_file_size.error[i]
911                                           ,i
912                                           ,(int)resp_p->u.tree_get_file_size.size[i]);
913
914          if (resp_p->u.tree_get_file_size.error[i] != 0)
915          {
916              gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: error[%d] is %d\n"
917                                               ,__func__
918                                               ,i
919                                               ,resp_p->u.tree_get_file_size.error[i]);
920              memset(getattr->size_array,0,sizeof(*getattr->size_array));
921              return (resp_p->u.tree_get_file_size.error[i]);
922          }
923
924          getattr->size_array[i] = resp_p->u.tree_get_file_size.size[i];
925       }/*end for*/
926
927    }/*end if*/
928
929    return(0);
930}/*end getattr_datafile_getattr_comp_fn*/
931
932static PINT_sm_action getattr_datafile_getattr_retry(
933        struct PINT_smcb *smcb, job_status_s *js_p)
934{
935    struct PINT_client_sm   *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
936    struct PVFS_object_attr *attr = &(sm_p->getattr.attr);
937    PVFS_metafile_attr *meta = &(attr->u.meta);
938    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
939    PINT_client_getattr_mirror_ctx *ctx = NULL;
940    PINT_sm_msgarray_op *mop = &(sm_p->msgarray_op);
941    PINT_sm_msgpair_state *msg = &(mop->msgarray[0]);
942    char *enc_req_bytes = NULL;
943    struct PVFS_servreq_tree_get_file_size *tree =
944                                              &(msg->req.u.tree_get_file_size);
945    uint32_t retry_msg_count = 0;
946    uint32_t index = 0;
947    uint32_t copies = 0;
948    uint32_t server_nr = 0;
949    int i   = 0;
950    int j   = 0;
951    int k   = 0;
952    int ret = 0;
953    uint32_t *tmp_server_nr;
954    PVFS_handle *tmp_handles;
955
956    gossip_debug(GOSSIP_CLIENT_DEBUG,
957                 "(%p) getattr state: "
958                 "getattr_datafile_getattr_retry\n", sm_p);
959
960    /*We only need to retry if we have mirrors; otherwise, msgpairarray
961     *has already handled the retries.
962    */
963    if (!(attr->mask & PVFS_ATTR_META_MIRROR_DFILES)) 
964    { 
965       /*we are NOT mirroring.*/
966       return SM_ACTION_COMPLETE;
967    }
968   
969    /* How many handles need to be retried? */
970    for (i=0; i<tree->num_data_files; i++)
971    {
972        server_nr = getattr->index_to_server[i];
973        ctx = &(getattr->mir_ctx_array[server_nr]);
974        if (ctx->msg_completed == 0)
975            retry_msg_count++;
976    }
977
978    /* no retries needed */
979    if (retry_msg_count == 0)
980    {
981        return SM_ACTION_COMPLETE;
982    }
983
984    /* do we have any retries available? */
985    if (getattr->retry_count >= mop->params.retry_limit)
986    {
987        /* at this point, we have msgpairs that need to be retried, but we
988         * we have met our retry limit.  so, we must invalidate the size array,
989         * since we don't have all of the necessary sizes AND return an error.
990        */
991        memset(getattr->size_array,0,sizeof(*getattr->size_array) * getattr->size);
992        js_p->error_code = (js_p->error_code ? js_p->error_code : -PVFS_ETIME);
993        gossip_err("%s: Ran out of retries(%d)\n",__func__
994                                                 ,getattr->retry_count);
995        return SM_ACTION_COMPLETE;
996    }
997
998    /*allocate temporary index-to-server array*/
999    tmp_server_nr = malloc(sizeof(uint32_t) * retry_msg_count);
1000    if (!tmp_server_nr)
1001    {
1002        gossip_lerr("Unable to allocate temporary index-to-server array.\n");
1003        js_p->error_code = -PVFS_ENOMEM;
1004        return SM_ACTION_COMPLETE;
1005    }
1006    memset(tmp_server_nr,0,sizeof(uint32_t) * retry_msg_count);
1007
1008    /*allocate temporary handle array*/
1009    tmp_handles = malloc(sizeof(PVFS_handle) * retry_msg_count);
1010    if (!tmp_handles)
1011    {
1012        gossip_lerr("Unable to allocate temporary handle array.\n");
1013        js_p->error_code = -PVFS_ENOMEM;
1014        return SM_ACTION_COMPLETE;
1015    }
1016    memset(tmp_handles,0,sizeof(PVFS_handle) * retry_msg_count);
1017
1018    /* okay. let's setup new handles to retry.
1019    */
1020    for (i=0,j=0; i<tree->num_data_files; i++)
1021    {
1022        server_nr = getattr->index_to_server[i];
1023        ctx = &(getattr->mir_ctx_array[server_nr]);
1024     
1025        /* don't process completed messages */
1026        if (ctx->msg_completed)
1027           continue;
1028
1029        /* for incomplete messages, cleanup memory, if necessary */
1030        enc_req_bytes = (char *)&(msg->encoded_req);
1031        for (k=0; k<sizeof(msg->encoded_req); k++)
1032        {
1033           if (enc_req_bytes[k] != '\0')
1034           {
1035              PINT_encode_release(&(msg->encoded_req),PINT_ENCODE_REQ);
1036              break;
1037           }
1038        }/*end for*/
1039
1040        if (msg->encoded_resp_p)
1041        {
1042            BMI_memfree(msg->svr_addr
1043                       ,msg->encoded_resp_p
1044                       ,msg->max_resp_sz
1045                       ,BMI_RECV);
1046        }
1047       
1048        /* Should we use the original datahandle? */
1049        if (ctx->retry_original)
1050        {
1051            ctx->retry_original = 0;
1052            tmp_handles[j]   = ctx->original_datahandle;
1053            tmp_server_nr[j] = ctx->original_server_nr;
1054            j++;
1055            continue;
1056        }/*end retry_original*/
1057
1058        /* otherwise, get next mirrored handle.  note:  if a mirrored handle is
1059         * zero, then this means that the creation of this mirrored object
1060         * failed for its particular server.  in this case, get the next valid
1061         * handle.  as a last resort, retry the original handle.
1062        */
1063        copies = ctx->current_copies_count;
1064        for (;copies < meta->mirror_copies_count; copies++)
1065        {
1066            index = (copies*meta->dfile_count) + server_nr;
1067            if (meta->mirror_dfile_array[index] != 0)
1068            {  /* we have found a valid mirrored handle */
1069               tmp_handles[j]   = meta->mirror_dfile_array[index];
1070               tmp_server_nr[j] = server_nr;
1071               j++;
1072               break;
1073            }
1074        }
1075
1076        /* if we haven't found a valid mirrored handle, retry the original
1077         * datahandle.
1078        */
1079        if ( copies == meta->mirror_copies_count )
1080        {
1081           tmp_handles[j]   = ctx->original_datahandle;
1082           tmp_server_nr[j] = ctx->original_server_nr;
1083           j++;
1084           ctx->retry_original = 0;
1085           ctx->current_copies_count = 0;
1086           getattr->retry_count++;
1087           continue;
1088        }/*end if we have to use the original*/
1089
1090        /* otherwise, setup for the next retry event */
1091        ctx->current_copies_count++;
1092        if (ctx->current_copies_count == meta->mirror_copies_count)
1093        {
1094           ctx->current_copies_count = 0;
1095           ctx->retry_original = 1;
1096           getattr->retry_count++;
1097        }
1098    }/*end for each handle in the old request*/
1099
1100    /*replace values in old tree request*/
1101    free(tree->handle_array);
1102    tree->handle_array = tmp_handles;
1103    tree->num_data_files = retry_msg_count;
1104   
1105    /*replace values in old message request*/
1106    msg->handle = tmp_handles[0];
1107    msg->svr_addr=0;
1108    ret = PINT_cached_config_map_to_server(&msg->svr_addr
1109                                          ,msg->handle
1110                                          ,msg->fs_id);
1111    if (ret)
1112    {
1113        gossip_lerr("Unable to determine server address for handle(%llu) and "
1114                    "file system(%d).\n"
1115                   ,llu(msg->handle)
1116                   ,msg->fs_id);
1117        js_p->error_code = -PVFS_EINVAL;
1118        return SM_ACTION_COMPLETE;
1119    }
1120
1121    /*save index-to-server array*/
1122    free(getattr->index_to_server);
1123    getattr->index_to_server = tmp_server_nr;
1124
1125    /* Push the msgarray_op and jump to msgpairarray.sm */
1126    PINT_sm_push_frame(smcb,0,mop);
1127    js_p->error_code=GETATTR_IO_RETRY;
1128
1129    return SM_ACTION_COMPLETE;
1130} /*end datafile_getattr_retry*/
1131
1132
1133
1134
1135
1136
1137static PINT_sm_action getattr_datafile_getattr_cleanup(
1138        struct PINT_smcb *smcb, job_status_s *js_p)
1139{
1140    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1141//FINISH_PROFILER("getattr", getattr_prof, 1);
1142    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
1143
1144    /* cleanup handle_array created for the one tree request */
1145    PINT_sm_msgpair_state *msg_p = &(sm_p->msgarray_op.msgarray[0]);
1146    if (msg_p->req.u.tree_get_file_size.handle_array)
1147       free(msg_p->req.u.tree_get_file_size.handle_array);
1148   
1149    /* cleanup tree request */
1150    PINT_msgpairarray_destroy(&sm_p->msgarray_op);
1151
1152    /* cleanup memory that may have been used for mirrored retries.*/
1153    if (getattr->mir_ctx_array)
1154       free(getattr->mir_ctx_array);
1155    if (getattr->index_to_server)
1156       free(getattr->index_to_server);
1157
1158
1159    return SM_ACTION_COMPLETE;
1160}
1161
1162static PINT_sm_action getattr_acache_insert(
1163        struct PINT_smcb *smcb, job_status_s *js_p)
1164{   
1165    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1166    PVFS_size* tmp_size = NULL;
1167
1168    if( sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE ||
1169        sm_p->getattr.attr.objtype == PVFS_TYPE_DIRECTORY ||
1170        sm_p->getattr.attr.objtype == PVFS_TYPE_SYMLINK )
1171    {
1172        /* see if we have a size value to cache */
1173        if (sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE &&
1174            sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
1175        {                                           
1176            if(!(sm_p->getattr.attr.mask & PVFS_ATTR_META_UNSTUFFED))
1177            {
1178                /* stuffed file case */
1179                sm_p->getattr.size = sm_p->getattr.attr.u.meta.stuffed_size;
1180                tmp_size = &sm_p->getattr.size;
1181                gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr_acache_insert "
1182                  "calculated stuffed logical size of %lld\n", lld(*tmp_size));
1183            }
1184            else
1185            {
1186                /* compute size as requested */
1187                assert(sm_p->getattr.attr.u.meta.dist);
1188                assert(sm_p->getattr.attr.u.meta.dist->methods &&
1189                       sm_p->getattr.attr.u.meta.dist->methods->logical_file_size);
1190
1191                sm_p->getattr.size =
1192                    (sm_p->getattr.attr.u.meta.dist->methods->logical_file_size)(
1193                    sm_p->getattr.attr.u.meta.dist->params,
1194                    sm_p->getattr.attr.u.meta.dfile_count,
1195                    sm_p->getattr.size_array);
1196
1197                tmp_size = &sm_p->getattr.size;
1198                gossip_debug(GOSSIP_GETATTR_DEBUG,"getattr_acache_insert calculated"
1199                                                  " unstuffed logical size of %lld\n"
1200                                                 , lld(*tmp_size));
1201            }
1202        }
1203
1204        PINT_acache_update(sm_p->getattr.object_ref,
1205            &sm_p->getattr.attr,
1206            tmp_size);
1207
1208        gossip_debug(GOSSIP_CLIENT_DEBUG, "trying to add object "
1209                     "reference to acache\n");
1210    }
1211
1212    return SM_ACTION_COMPLETE;
1213}
1214
1215static PINT_sm_action getattr_cleanup(
1216        struct PINT_smcb *smcb, job_status_s *js_p)
1217{
1218    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1219    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
1220
1221    gossip_debug(GOSSIP_CLIENT_DEBUG,
1222                 "(%p) getattr state: getattr_cleanup\n", sm_p);
1223
1224    gossip_debug(GOSSIP_GETATTR_DEBUG
1225                ,"%s: js_p->error_code:%d \tgetattr->attr.mask:0x%08x\n"
1226                ,__func__
1227                ,js_p->error_code
1228                ,getattr->attr.mask);
1229
1230    sm_p->error_code = js_p->error_code;
1231
1232    /* cleanup size array; is only allocated if datafile sizes are retrieved */
1233    if (getattr->size_array)
1234       free(getattr->size_array);
1235
1236    /* cleanup getattr when an error occurs */
1237    if (js_p->error_code)
1238    {
1239      if (getattr->attr.mask & PVFS_ATTR_META_DFILES)
1240      {
1241         if (getattr->attr.u.meta.dfile_array)
1242            free(getattr->attr.u.meta.dfile_array);
1243      }
1244
1245      if (getattr->attr.mask & PVFS_ATTR_META_MIRROR_DFILES)
1246      {
1247         if (getattr->attr.u.meta.mirror_dfile_array)
1248            free(getattr->attr.u.meta.mirror_dfile_array);
1249      }
1250
1251      if (getattr->attr.mask & PVFS_ATTR_META_DIST)
1252      {
1253         PINT_dist_free(getattr->attr.u.meta.dist);
1254      }
1255    }/*end if error*/
1256
1257    return SM_ACTION_COMPLETE;
1258}
1259
1260static PINT_sm_action getattr_set_sys_response(
1261        struct PINT_smcb *smcb, job_status_s *js_p)
1262{
1263    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1264    PVFS_sysresp_getattr * sysresp = NULL;
1265    PVFS_object_attr *attr = NULL;
1266
1267    if(js_p->error_code != 0)
1268    {
1269        PINT_SET_OP_COMPLETE;
1270        return SM_ACTION_TERMINATE;
1271    }
1272
1273    attr = &sm_p->getattr.attr;
1274    assert(attr);
1275   
1276    /* If we get to this state action,
1277     * the getattr state machine was invoked, so
1278     * we can assume that one of the PVFS_[i]sys_getattr functions
1279     * was called, and the response field must be filled in for the
1280     * user.
1281     */
1282
1283    sysresp = sm_p->u.getattr.getattr_resp_p;
1284   
1285    /*
1286     * if we retrieved a symlink target, copy it for the caller; this
1287     * target path will be handed all the way back up to the caller via
1288     * the PVFS_sys_attr object.  The caller of PVFS_[i]sys_getattr
1289     * must free it.
1290     */
1291    if(attr->objtype == PVFS_TYPE_SYMLINK &&
1292       attr->mask & PVFS_ATTR_SYMLNK_TARGET)
1293    {
1294        assert(attr->u.sym.target_path_len > 0);
1295        assert(attr->u.sym.target_path);
1296
1297        sysresp->attr.link_target = strdup(attr->u.sym.target_path);
1298        if (!sysresp->attr.link_target)
1299        {
1300           js_p->error_code = -PVFS_ENOMEM;
1301           PINT_SET_OP_COMPLETE;
1302           return SM_ACTION_TERMINATE;
1303        }
1304    }
1305
1306    if(attr->objtype == PVFS_TYPE_METAFILE)
1307    {
1308       /* Copy if there are any special object specific flags */
1309       sysresp->attr.flags = attr->u.meta.hint.flags;
1310       /* special case for when users ask for dfile count */
1311       if (sm_p->getattr.req_attrmask & PVFS_ATTR_META_DFILES)
1312       {
1313           sysresp->attr.dfile_count = attr->u.meta.dfile_count;
1314       }
1315       if (sm_p->getattr.req_attrmask & PVFS_ATTR_META_MIRROR_DFILES)
1316       {
1317         sysresp->attr.mirror_copies_count = attr->u.meta.mirror_copies_count;
1318       }
1319    }
1320    if (attr->objtype == PVFS_TYPE_DIRECTORY)
1321    {
1322        gossip_debug(GOSSIP_CLIENT_DEBUG, "dfile_count: %d\n",
1323            attr->u.dir.hint.dfile_count);
1324        gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_len = %d, "
1325            "dist_params_len = %d\n",
1326            attr->u.dir.hint.dist_name_len, attr->u.dir.hint.dist_params_len);
1327        sysresp->attr.dfile_count = attr->u.dir.hint.dfile_count;
1328        /*
1329         * If we retrieved any extended attributes for the directory
1330         * in question, the caller's responsibility to free it up
1331         */
1332        if (attr->u.dir.hint.dist_name_len > 0 &&
1333            (sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_HINT))
1334        {
1335            sysresp->attr.dist_name = strdup(attr->u.dir.hint.dist_name);
1336            if (!sysresp->attr.dist_name)
1337            {
1338                js_p->error_code = -PVFS_ENOMEM;
1339                PINT_SET_OP_COMPLETE;
1340                return SM_ACTION_TERMINATE;
1341            }
1342            gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_hint: %s\n"
1343                                            , sysresp->attr.dist_name);
1344        }
1345        if (attr->u.dir.hint.dist_params_len > 0 &&
1346            (sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_HINT))
1347        {
1348            sysresp->attr.dist_params = strdup(attr->u.dir.hint.dist_params);
1349            if (!sysresp->attr.dist_params)
1350            {
1351                free(sysresp->attr.dist_name);
1352                sysresp->attr.dist_name = NULL;
1353                js_p->error_code = -PVFS_ENOMEM;
1354                PINT_SET_OP_COMPLETE;
1355                return SM_ACTION_TERMINATE;
1356            }
1357            gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_params: %s\n"
1358                                            , sysresp->attr.dist_params);
1359        }
1360    }
1361
1362    /* copy outgoing sys_attr fields from returned object_attr */
1363    sysresp->attr.owner = attr->owner;
1364    sysresp->attr.group = attr->group;
1365    sysresp->attr.perms = attr->perms;
1366    sysresp->attr.atime = attr->atime;
1367    sysresp->attr.mtime = attr->mtime;
1368    sysresp->attr.ctime = attr->ctime;
1369    sysresp->attr.mask  = PVFS_util_object_to_sys_attr_mask(attr->mask);
1370    sysresp->attr.size  = 0;
1371    sysresp->attr.objtype = attr->objtype;
1372
1373    if (js_p->error_code == 0)
1374    {
1375        /* convert outgoing attribute mask based on what we got */
1376        sysresp->attr.mask = PVFS_util_object_to_sys_attr_mask(
1377            sm_p->getattr.attr.mask);
1378
1379       if (sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
1380        {
1381            if( attr->objtype == PVFS_TYPE_DATAFILE )
1382            {
1383                sysresp->attr.size = attr->u.data.size;
1384            }
1385            else
1386            {
1387                sysresp->attr.size = sm_p->getattr.size;
1388            }
1389
1390            sysresp->attr.mask |= PVFS_ATTR_SYS_SIZE;
1391        }
1392
1393        if(attr->mask & PVFS_ATTR_META_DIST)
1394        {
1395            /* we have enough information to set a block size */
1396            sysresp->attr.blksize = attr->u.meta.dist->methods->get_blksize(
1397                attr->u.meta.dist->params);
1398            sysresp->attr.mask |= PVFS_ATTR_SYS_BLKSIZE;
1399        }
1400
1401        /* if this is a symlink, add the link target */
1402        if (sm_p->getattr.req_attrmask & PVFS_ATTR_SYMLNK_TARGET)
1403        {
1404            sysresp->attr.mask |= PVFS_ATTR_SYS_LNK_TARGET;
1405        }
1406
1407        if(sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_DIRENT_COUNT)
1408        {
1409            sysresp->attr.dirent_count = attr->u.dir.dirent_count;
1410            sysresp->attr.mask |= PVFS_ATTR_SYS_DIRENT_COUNT;
1411        }
1412    }
1413    else
1414    {
1415        /* in case of failure, blank out response */
1416        memset(sm_p->u.getattr.getattr_resp_p,
1417               0, sizeof(PVFS_sysresp_getattr));
1418    }
1419
1420    PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
1421
1422    PINT_SET_OP_COMPLETE;
1423    return SM_ACTION_TERMINATE;
1424}
1425
1426/*
1427 * Local variables:
1428 *  mode: c
1429 *  c-indent-level: 4
1430 *  c-basic-offset: 4
1431 * End:
1432 *
1433 * vim: ft=c ts=8 sts=4 sw=4 expandtab
1434 */
Note: See TracBrowser for help on using the browser.