root/branches/Orange-Elaine-Distr-Dir-Branch/src/client/sysint/sys-getattr.sm @ 8660

Revision 8660, 56.4 KB (checked in by shuangy, 2 years ago)

1. sys-getattr only contact active dirdata servers. 2. cleanup mkdir. 3. fix a memory leak in PINT_free_object_attr.

Line 
1/*
2 * (C) 2003 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/** \file
8 *  \ingroup sysint
9 *
10 *  PVFS2 system interface routines for obtaining attributes of an object
11 *  (file or directory).
12 */
13#include <string.h>
14#include <assert.h>
15
16#include "client-state-machine.h"
17#include "pvfs2-debug.h"
18#include "job.h"
19#include "gossip.h"
20#include "str-utils.h"
21#include "pint-util.h"
22#include "pvfs2-util.h"
23#include "pint-cached-config.h"
24#include "PINT-reqproto-encode.h"
25#include "pvfs2-internal.h"
26#include "pvfs2-types-debug.h"
27#include "dist-dir-utils.h"
28
29/* pvfs2_client_getattr_sm
30 *
31 * The sm_p->msgpair structure is used to get the attributes of the
32 * object itself.  We convert the original attribute mask (in
33 * sm_p->u.getattr.attrmask) to ask for datafile and distribution info
34 * if the user asked for file size (PVFS_ATTR_SYS_SIZE).  This allows
35 * us to obtain this information (if the object turns out to be a
36 * metafile) so that we can later look up the datafile sizes and
37 * calculate the overall file size.
38 *
39 * The sm_p->msgpairarray is used to get datafile sizes, if it turns
40 * out that we need them.  This space will also need to be freed, if
41 * we grab these sizes.
42 */
43
44static struct profiler getattr_prof __attribute__((unused));
45
46extern job_context_id pint_client_sm_context;
47
48enum
49{
50    GETATTR_ACACHE_MISS = 1,
51    GETATTR_NEED_DATAFILE_SIZES = 2,
52    GETATTR_IO_RETRY = 3,
53    GETATTR_NEED_DIRDATA_ATTRS = 4,
54};
55
56/* completion function prototypes */
57static int getattr_object_getattr_comp_fn(
58    void *v_p, struct PVFS_server_resp *resp_p, int index);
59static int getattr_datafile_getattr_comp_fn(
60    void *v_p, struct PVFS_server_resp *resp_p, int index);
61static int getattr_dirdata_getattr_comp_fn(
62    void *v_p, struct PVFS_server_resp *resp_p, int index);
63
64%%
65
66nested machine pvfs2_client_datafile_getattr_sizes_sm
67{
68    state datafile_getattr_setup_msgpairarray
69    {
70        run getattr_datafile_getattr_setup_msgpairarray;
71        success => datafile_getattr_xfer_msgpairarray;
72        default => datafile_getattr_cleanup;
73    }
74   
75    state datafile_getattr_xfer_msgpairarray
76    {
77        jump pvfs2_msgpairarray_sm;
78        default => datafile_getattr_retry;
79    }
80
81    state datafile_getattr_retry
82    {
83        run getattr_datafile_getattr_retry;
84        GETATTR_IO_RETRY => datafile_getattr_xfer_msgpairarray;
85        default => datafile_getattr_cleanup;
86    }
87
88    state datafile_getattr_cleanup
89    {
90        run getattr_datafile_getattr_cleanup;
91        default => return;
92    }
93}
94
95
96nested machine pvfs2_client_dirdata_getattr_sm
97{
98    state dirdata_getattr_setup_msgpairarray
99    {
100        run getattr_dirdata_getattr_setup_msgpairarray;
101        success => dirdata_getattr_xfer_msgpairarray;
102        default => dirdata_getattr_cleanup;
103    }
104   
105    state dirdata_getattr_xfer_msgpairarray
106    {
107        jump pvfs2_msgpairarray_sm;
108        default => dirdata_getattr_cleanup;
109    }
110
111    state dirdata_getattr_cleanup
112    {
113        run getattr_dirdata_getattr_cleanup;
114        default => return;
115    }
116}
117
118
119nested machine pvfs2_client_getattr_sm
120{
121    state acache_lookup
122    {
123        run getattr_acache_lookup;
124        GETATTR_ACACHE_MISS => object_getattr_setup_msgpair;
125        GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
126        default => cleanup;
127    }
128
129    state object_getattr_setup_msgpair
130    {
131        run getattr_object_getattr_setup_msgpair;
132        success => object_getattr_xfer_msgpair;
133        default => cleanup;
134    }
135
136    state object_getattr_xfer_msgpair
137    {
138        jump pvfs2_msgpairarray_sm;
139        success => acache_insert;
140        GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
141        GETATTR_NEED_DIRDATA_ATTRS => dirdata_get_attrs;
142        default => object_getattr_failure;
143    }
144
145    state acache_insert
146    {
147        run getattr_acache_insert;
148        default => cleanup;
149    }
150
151    state object_getattr_failure
152    {
153        run getattr_object_getattr_failure;
154        default => cleanup;
155    }
156
157    state datafile_get_sizes
158    {
159        jump pvfs2_client_datafile_getattr_sizes_sm;
160        success => acache_insert;
161        default => cleanup;
162    }
163
164    state dirdata_get_attrs
165    {
166        jump pvfs2_client_dirdata_getattr_sm;
167        success => acache_insert;
168        default => cleanup;
169    }
170
171    state cleanup
172    {
173        run getattr_cleanup;
174        default => return;
175    }
176}
177
178machine pvfs2_client_sysint_getattr_sm
179{
180    state dowork
181    {
182        jump pvfs2_client_getattr_sm;
183        default => set_sys_response;
184    }
185
186    state set_sys_response
187    {
188        run getattr_set_sys_response;
189        default => terminate;
190    }
191}
192   
193%%
194
195
196/** Initiate retrieval of object attributes.
197 */
198PVFS_error PVFS_isys_getattr(
199    PVFS_object_ref ref,
200    uint32_t attrmask,
201    const PVFS_credentials *credentials,
202    PVFS_sysresp_getattr *resp_p,
203    PVFS_sys_op_id *op_id,
204    PVFS_hint hints,
205    void *user_ptr)
206{
207    PVFS_error ret = -PVFS_EINVAL;
208    PINT_smcb *smcb = NULL;
209    PINT_client_sm *sm_p = NULL;
210
211    gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_getattr entered\n");
212
213    if ((ref.handle == PVFS_HANDLE_NULL) ||
214        (ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
215    {
216        gossip_err("invalid (NULL) required argument\n");
217        return ret;
218    }
219   
220    if (attrmask & ~(PVFS_ATTR_SYS_ALL))
221    {
222        gossip_err("invalid attrmask\n");
223        return ret;
224    }
225
226    PINT_smcb_alloc(&smcb, PVFS_SYS_GETATTR,
227            sizeof(struct PINT_client_sm),
228            client_op_state_get_machine,
229            client_state_machine_terminate,
230            pint_client_sm_context);
231    if (smcb == NULL)
232    {
233        return -PVFS_ENOMEM;
234    }
235    sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
236
237    PINT_init_msgarray_params(sm_p, ref.fs_id);
238    PINT_init_sysint_credentials(sm_p->cred_p, credentials);
239    sm_p->error_code = 0;
240    sm_p->object_ref = ref;
241    sm_p->u.getattr.getattr_resp_p = resp_p;
242    PVFS_hint_copy(hints, &sm_p->hints);
243    PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle)
244                 ,&ref.handle);
245   
246    PINT_SM_GETATTR_STATE_FILL(
247        sm_p->getattr,
248        ref,
249        PVFS_util_sys_to_object_attr_mask(
250            attrmask),
251        PVFS_TYPE_NONE,
252        0);
253
254    return PINT_client_state_machine_post(
255        smcb,  op_id, user_ptr);
256}
257
258/** Retrieve object attributes.
259 */
260PVFS_error PVFS_sys_getattr(
261    PVFS_object_ref ref,
262    uint32_t attrmask,
263    const PVFS_credentials *credentials,
264    PVFS_sysresp_getattr *resp_p,
265    PVFS_hint hints)
266{
267    PVFS_error ret, error;
268    PVFS_sys_op_id op_id;
269
270    gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_getattr entered\n");
271
272    ret = PVFS_isys_getattr(ref, attrmask, credentials,
273                            resp_p, &op_id, hints, NULL);
274    if (ret)
275    {
276        PVFS_perror_gossip("PVFS_isys_getattr call", ret);
277        return ret;
278    }
279
280    if(op_id != -1)
281    {
282        /* did not complete immediately, so we wait */
283
284        ret = PVFS_sys_wait(op_id, "getattr", &error);
285        if (ret)
286        {
287            PVFS_perror_gossip("PVFS_sys_wait call", ret);
288        }
289        if(error)
290        {
291            ret = error;
292        }
293        PINT_sys_release(op_id);
294    }
295
296    return ret;
297}
298
299
300/**
301 * getattr_acache_lookup
302 * @ingroup client_sm_getattr
303 *
304 * This function is invoked as the first state action of the
305 * getattr-dowork state machine.  It performs a lookup into the
306 * attribute cache for the attribute in question, and returns
307 * result codes for a hit or a miss. 
308 *
309 * @param smcb This must be a valid client state machine handle, with
310 * the @ref getattr field containing valid values for the
311 * fsid/handle of the desired attribute (in object_ref), as well as the
312 * requested attribute mask (req_attrmask).
313 *
314 * @param js_p Contains the return code to be set by this function in the
315 * @ref error_code field.  This determines the next state action to jump
316 * to in the getattr-dowork state machine.  Possible values for js_p->error_code
317 * are:
318 * <ul>
319 * <li><b>GETATTR_ACACHE_MISS</b> - The requested attribute was not found
320 * in the attribute cache, or the attribute was found, but the attribute's
321 * mask did not include values required by the requested mask (req_attrmask).
322 * </li>
323 * <li><b>GETATTR_NEED_DATAFILE_SIZES</b> - The requested attribute was found
324 * in the attribute cache and the mask was sufficient, but the data file size
325 * was requested for this handle, so we need to get that next.
326 * </li>
327 * <li><b>default</b> - The requested attribute was found in the attribute
328 * cache and its mask values were sufficient.
329 * </li>
330 *
331 * @return This function should always return 1 unless an error occurred
332 * within the internals of the state machine.
333 */
334static PINT_sm_action getattr_acache_lookup(
335        struct PINT_smcb *smcb, job_status_s *js_p)
336{
337    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
338    uint32_t trimmed_mask = 0;
339    int missing_attrs;
340    PVFS_object_ref object_ref;
341    int ret = -1;
342    int attr_status = -1;
343    int size_status = -1;
344
345    js_p->error_code = 0;
346
347    object_ref = sm_p->getattr.object_ref;
348
349    assert(object_ref.handle != PVFS_HANDLE_NULL);
350    assert(object_ref.fs_id != PVFS_FS_ID_NULL);
351
352    gossip_debug(GOSSIP_ACACHE_DEBUG, "%s: handle %llu fsid %d\n",
353      __func__, llu(object_ref.handle), object_ref.fs_id);
354
355   
356    /* The sys attr mask request is converted to object
357     * attr mask values for comparison with the cached
358     */
359    if(sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
360    {
361        sm_p->getattr.req_attrmask |= PVFS_ATTR_META_ALL;
362    }
363
364    /* if need dir timestamp or dirent_count, need dist_dir_struct */
365    if(sm_p->getattr.req_attrmask &
366            (PVFS_ATTR_DIR_DIRENT_COUNT |
367             PVFS_ATTR_COMMON_ATIME |
368             PVFS_ATTR_COMMON_CTIME |
369             PVFS_ATTR_COMMON_MTIME |
370             0 )
371      )
372    {
373        sm_p->getattr.req_attrmask |= PVFS_ATTR_DIR_DISTDIR_ATTR;
374    }
375
376    if(sm_p->getattr.flags & PINT_SM_GETATTR_BYPASS_CACHE)
377    {
378        gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: forced acache miss: "
379                    " [%llu]\n",
380                      llu(object_ref.handle));
381        js_p->error_code = GETATTR_ACACHE_MISS;
382        return SM_ACTION_COMPLETE;
383    }
384
385    ret = PINT_acache_get_cached_entry(object_ref,
386        &sm_p->getattr.attr,
387        &attr_status,
388        &sm_p->getattr.size,
389        &size_status);
390    if(ret < 0 || attr_status < 0)
391    {
392        gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: clean acache miss: "
393                    " [%llu]\n",
394                      llu(object_ref.handle));
395 
396        js_p->error_code = GETATTR_ACACHE_MISS;
397        return SM_ACTION_COMPLETE;
398    }
399
400    /* acache hit, check results */
401 
402    /* The sys attr mask request is converted to object
403     * attr mask values for comparison with the cached
404     * entry
405     */
406    trimmed_mask = sm_p->getattr.req_attrmask;
407
408    gossip_debug(GOSSIP_GETATTR_DEBUG,"request attrmask:\n");
409    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,sm_p->getattr.req_attrmask);
410
411    gossip_debug(GOSSIP_GETATTR_DEBUG,"trimmed attrmask:\n");
412    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,trimmed_mask);
413
414    gossip_debug(GOSSIP_GETATTR_DEBUG,"returned attrmask:\n");
415    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,sm_p->getattr.attr.mask);
416
417    /* the trimmed mask is used for making sure that we're only
418     * checking attr bits that make sense for the object type
419     * since the caller may have requested all attributes in
420     * the case where it doesn't know what type of object we're
421     * doing the getattr against.
422     */
423    if (sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE)
424    {
425        trimmed_mask &= (PVFS_ATTR_META_ALL |
426                         PVFS_ATTR_META_UNSTUFFED |
427                         PVFS_ATTR_DATA_SIZE |
428                         PVFS_ATTR_COMMON_ALL);
429    }
430    else if (sm_p->getattr.attr.objtype == PVFS_TYPE_SYMLINK)
431    {
432        trimmed_mask &= (PVFS_ATTR_SYMLNK_ALL | PVFS_ATTR_COMMON_ALL);
433    }
434    else if (sm_p->getattr.attr.objtype == PVFS_TYPE_DIRECTORY)
435    {
436        trimmed_mask &= (PVFS_ATTR_COMMON_ALL | PVFS_ATTR_DIR_ALL);
437    }
438 
439    /* trimmed_mask contains the list of attributes
440     * requested for a particular object,
441     * while sm_p->getattr.attr.mask contains
442     * the list of attributes cached for that object.
443     * The cached attributes can be used if requested
444     * is less than cached, i.e. all the attributes
445     * we need are already cached.  So we need to do
446     * a bitwise comparison of requested <= cached.
447     *
448     * xor of the two masks gives us the bits that are different,
449     * and-ing that result with the requested mask gives us the
450     * bits in the requested mask but not in the cached mask.
451     */
452    missing_attrs = ((trimmed_mask ^ sm_p->getattr.attr.mask) &
453                     trimmed_mask);
454
455    gossip_debug(GOSSIP_GETATTR_DEBUG,"missing attrmask BEFORE:\n");
456    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,missing_attrs);
457
458    if ( missing_attrs & PVFS_ATTR_META_MIRROR_DFILES )
459    {
460       /*Mirroring is optional, so remove mirror-dfiles*/
461         missing_attrs &= ~PVFS_ATTR_META_MIRROR_DFILES;   
462    }
463         
464    gossip_debug(GOSSIP_GETATTR_DEBUG,"missing attrmask AFTER:\n");
465    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,missing_attrs);
466
467    if((missing_attrs == PVFS_ATTR_DATA_SIZE && size_status == 0) ||
468        (missing_attrs == 0))
469    {
470        /* nothing's missing, a hit! */
471        gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache hit "
472                     "[%llu]\n", llu(object_ref.handle));
473        js_p->error_code = 0;
474        return SM_ACTION_COMPLETE;
475    }
476 
477    /* check if the only thing missing is the file size, then
478     * we don't need to do the object getattr operation, we only
479     * need to do the datafile getattr operation, so we return
480     * the GETATTR_NEED_DATAFILE_SIZES error code which will make
481     * the getattr-dowork state machine jump to the datafile getattr
482     * operation state.
483     */
484    if(missing_attrs == PVFS_ATTR_DATA_SIZE)
485    {
486        if(!(sm_p->getattr.attr.mask & PVFS_ATTR_META_UNSTUFFED))
487        {
488            /* We are missing the size, and we don't know for sure if the
489             * file has been unstuffed.  In this case, act as though we
490             * missed on all atributes so that we can get fresh stuffed size
491             * or datafile information as needed.
492             */
493        }
494        else
495        {
496            /* if the file size is requested but the distribution info
497             * isn't and it hasn't been cached, then we need to
498             * get that first.
499             */
500
501            js_p->error_code = GETATTR_NEED_DATAFILE_SIZES;
502            gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache hit, need sizes"
503                         "[%llu]\n", llu(object_ref.handle));
504            return SM_ACTION_COMPLETE;
505        }
506    }
507
508    /* we missed */
509    /* clean out the attributes we got from the cache; this will be
510     * overwritten when we request updated information from the server
511     */
512    PINT_free_object_attr(&sm_p->getattr.attr);
513    gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache miss due to mask: "
514        " [%llu]\n",
515          llu(object_ref.handle));
516
517    js_p->error_code = GETATTR_ACACHE_MISS;
518    return SM_ACTION_COMPLETE;
519}
520
521
522static PINT_sm_action getattr_object_getattr_setup_msgpair(
523        struct PINT_smcb *smcb, job_status_s *js_p)
524{
525    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
526    int ret = -PVFS_EINVAL;
527    PVFS_object_ref object_ref;
528    PINT_sm_msgpair_state *msg_p;
529
530    gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) %s\n", sm_p, __func__);
531
532    js_p->error_code = 0;
533
534    PINT_msgpair_init(&sm_p->msgarray_op);
535    msg_p = &sm_p->msgarray_op.msgpair;
536
537    object_ref = sm_p->getattr.object_ref;
538
539    assert(object_ref.fs_id != PVFS_FS_ID_NULL);
540    assert(object_ref.handle != PVFS_HANDLE_NULL);
541
542    /* setup the msgpair to do a getattr operation */
543    PINT_SERVREQ_GETATTR_FILL(
544        msg_p->req,
545        *sm_p->cred_p,
546        object_ref.fs_id,
547        object_ref.handle,
548        sm_p->getattr.req_attrmask,
549        sm_p->hints);
550   
551    msg_p->fs_id = object_ref.fs_id;
552    msg_p->handle = object_ref.handle;
553    msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
554    msg_p->comp_fn = getattr_object_getattr_comp_fn;
555
556    ret = PINT_cached_config_map_to_server(
557        &msg_p->svr_addr, msg_p->handle,
558        msg_p->fs_id);
559    if (ret)
560    {
561        gossip_err("Failed to map meta server address\n");
562        js_p->error_code = ret;
563    }
564
565    PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
566    return SM_ACTION_COMPLETE;
567}
568
569/*
570  copies data from getattr response into the user supplied sys_attr
571  structure.  returns 0 for directories and symlinks, and
572  GETATTR_NEED_DATAFILE_SIZES for a metafile (when appropriate)
573*/
574static int getattr_object_getattr_comp_fn(
575    void *v_p,
576    struct PVFS_server_resp *resp_p,
577    int index)
578{
579    PVFS_object_attr *attr = NULL;
580    PINT_smcb *smcb = v_p;
581    PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
582
583    assert(resp_p->op == PVFS_SERV_GETATTR);
584
585    gossip_debug(GOSSIP_GETATTR_DEBUG,
586                 "getattr_object_getattr_comp_fn called\n");
587
588    if (resp_p->status != 0)
589    {
590        return resp_p->status;
591    }
592
593    /*
594     * If we've reached the callback for the getattr msgpair tranfer,
595     * then we can make a copy of the retrieved attribute for later
596     * caching.
597     */
598    PINT_copy_object_attr(&sm_p->getattr.attr,
599                          &resp_p->u.getattr.attr);
600
601    attr =  &sm_p->getattr.attr;
602
603    /* if the ref_type mask is set to a non-zero value (!PVFS_TYPE_NONE)
604     * a -PVFS_error will be triggered if the
605     * attributes received are not one of the the types specified.
606     * This is useful so that the client can know (in some cases) that it
607     * can avoid issuing an operation to the server since the server will
608     * just pass an error back anyway.
609     */
610    if(sm_p->getattr.ref_type &&
611       sm_p->getattr.ref_type != attr->objtype)
612    {
613        int ret;
614        gossip_debug(GOSSIP_CLIENT_DEBUG, "*** "
615                     "getattr_comp_fn: Object type mismatch.\n Possibly "
616                     "saving network roundtrip by returning an error\n");
617
618        if (sm_p->getattr.ref_type == PVFS_TYPE_DIRECTORY)
619        {
620            ret = -PVFS_ENOTDIR;
621        }
622        else
623        {
624            assert(sm_p->getattr.ref_type == PVFS_TYPE_METAFILE);
625            ret = ((attr->objtype == PVFS_TYPE_DIRECTORY) ?
626                   -PVFS_EISDIR : -PVFS_EBADF);
627        }
628        PVFS_perror_gossip("Object Type mismatch error", ret);
629        return ret;
630    }
631
632    /* do assertion checking of getattr response values, and
633     * check if file sizes are needed.  With NDEBUG defined, this block
634     * only checks if file sizes are needed.
635     */
636    switch (attr->objtype)
637    {
638        case PVFS_TYPE_METAFILE:
639            if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
640                PVFS_ATTR_META_DIST)
641            {
642                /* if we requested distribution attrs, did the distribution
643                 * get set and is the size valid?
644                 */
645                assert(attr->mask & PVFS_ATTR_META_DIST);
646                assert(attr->u.meta.dist && (attr->u.meta.dist_size > 0));
647            }
648            if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
649                PVFS_ATTR_META_MIRROR_DFILES)
650            {
651                if (attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
652                {    assert(attr->u.meta.mirror_dfile_array &&
653                          (attr->u.meta.mirror_copies_count > 0));
654                     gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: Mirror handles and "
655                                                       "copy count retrieved.\n"
656                                                     ,__func__);
657                }
658                else
659                {
660                   gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: request attribute "
661                   "mask says to get the mirror dfiles, if they exist, \nbut "
662                   "the response attribute mask says they were not retrieved. "
663                   "This is okay.\n"
664                   ,__func__);
665                }
666            }
667            if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
668                PVFS_ATTR_META_DFILES)
669            {
670                /* if we requested the datafile handles for the file, did
671                 * the datafile array get populated?
672                 */
673                assert(attr->u.meta.dfile_array &&
674                       (attr->u.meta.dfile_count > 0));
675
676                gossip_debug(GOSSIP_GETATTR_DEBUG,
677                             "getattr_object_getattr_comp_fn: "
678                             "%d datafiles.\n", attr->u.meta.dfile_count);
679               
680                /* if we need the size, that should be the only time we're
681                 * going to have to do a full data file fetch.
682                 * (that's expensive)
683                 */
684                if (sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
685                {
686                    /* is the file stuffed? */
687                    if(!(attr->mask & PVFS_ATTR_META_UNSTUFFED))
688                    {
689                        /* we can compute the size without doing any more
690                         * getattr requests
691                         */
692                        gossip_debug(GOSSIP_GETATTR_DEBUG,
693                            "getattr_object_getattr_comp_fn: "
694                            "detected stuffed file.\n");
695                        return(0);
696                    }
697                    /* if caller asked for the size, then we need
698                     * to jump to the datafile_getattr state, which
699                     * will retrieve the datafile sizes for us.
700                     */
701                    return GETATTR_NEED_DATAFILE_SIZES;
702                }
703            }
704            return 0;
705        case PVFS_TYPE_DIRECTORY:
706        {
707            uint32_t dirent_file_count_i;
708            int tmp_i;
709            unsigned char *c;
710
711            /* dirent_count will be collected later */
712            gossip_debug(GOSSIP_CLIENT_DEBUG,
713                "getattr comp_fn [%p] "
714                "dfile_count = %d "
715                "dist_name_len = %d "
716                "dist_params_len = %d\n ",
717                attr,
718                attr->u.dir.hint.dfile_count,
719                attr->u.dir.hint.dist_name_len,
720                attr->u.dir.hint.dist_params_len);
721
722            if(attr->mask & PVFS_ATTR_DIR_DISTDIR_ATTR)
723            {
724
725                gossip_debug(GOSSIP_CLIENT_DEBUG,
726                        "sys-getattr: dist-dir-attr "
727                        "with tree_height=%d, num_servers=%d, bitmap_size=%d, "
728                        "split_size=%d, server_no=%d and branch_level=%d\n",
729                        attr->u.dir.dist_dir_attr.tree_height,
730                        attr->u.dir.dist_dir_attr.num_servers,
731                        attr->u.dir.dist_dir_attr.bitmap_size,
732                        attr->u.dir.dist_dir_attr.split_size,
733                        attr->u.dir.dist_dir_attr.server_no,
734                        attr->u.dir.dist_dir_attr.branch_level);
735
736                /* gossip bitmap */
737                gossip_debug(GOSSIP_CLIENT_DEBUG,
738                        "sys-getattr: dist_dir_bitmap as:\n");
739                for(tmp_i = attr->u.dir.dist_dir_attr.bitmap_size - 1;
740                        tmp_i >= 0 ; tmp_i--)
741                {
742                    c = (unsigned char *)(attr->u.dir.dist_dir_bitmap + tmp_i);
743                    gossip_debug(GOSSIP_MKDIR_DEBUG,
744                            " i=%d : %02x %02x %02x %02x\n",
745                            tmp_i, c[3], c[2], c[1], c[0]);
746                }
747                gossip_debug(GOSSIP_CLIENT_DEBUG, "\n");
748
749                /* gossip dirdata handles */
750                gossip_debug(GOSSIP_CLIENT_DEBUG,
751                        "sys-getattr: dirdata_handles as:\n");
752                for (dirent_file_count_i = 0;
753                        dirent_file_count_i <
754                        attr->u.dir.dist_dir_attr.num_servers;
755                        dirent_file_count_i++)
756                {
757                    gossip_debug(GOSSIP_CLIENT_DEBUG,
758                            "    dirdata_handle[%d] = %llu\n",
759                            dirent_file_count_i,
760                            llu(attr->u.dir.dirdata_handles[dirent_file_count_i]));
761                }
762            }
763
764            /* need to gather dirdata attr */
765            if(sm_p->getattr.req_attrmask &
766                    (PVFS_ATTR_DIR_DIRENT_COUNT |
767                     PVFS_ATTR_COMMON_ATIME |
768                     PVFS_ATTR_COMMON_CTIME |
769                     PVFS_ATTR_COMMON_MTIME |
770                     0 )
771              )
772            {
773                assert(attr->mask & PVFS_ATTR_DIR_DISTDIR_ATTR);
774
775                gossip_debug(GOSSIP_CLIENT_DEBUG,
776                        "  !!! need to contact dirdata to gather dirent_count and timestamps!\n");
777
778                return GETATTR_NEED_DIRDATA_ATTRS;
779
780            }
781
782            return 0;
783        }
784        case PVFS_TYPE_SYMLINK:
785            return 0;
786        case PVFS_TYPE_DATAFILE:
787            return 0;
788        case PVFS_TYPE_DIRDATA:
789            return 0;
790        case PVFS_TYPE_INTERNAL:
791            return 0;
792        default:
793            gossip_err("error: getattr_object_getattr_comp_fn: "
794                       "handle refers to invalid object type\n");
795    }
796    return -PVFS_EINVAL;
797}
798
799
800static PINT_sm_action getattr_object_getattr_failure(
801        struct PINT_smcb *smcb, job_status_s *js_p)
802{
803    struct PINT_client_sm *sm_p __attribute__((unused)) = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
804
805    gossip_debug(GOSSIP_CLIENT_DEBUG
806                ,"(%p) getattr state: getattr_object_getattr_failure\n"
807                ,sm_p);
808
809    if ((js_p->error_code != -PVFS_ENOENT) &&
810        (js_p->error_code != -PVFS_EINVAL))
811    {
812        PVFS_perror_gossip("getattr_object_getattr_failure ",
813                           js_p->error_code);
814    }
815
816    return SM_ACTION_COMPLETE;
817}
818
819/* NOTE:  This nested state machine allocates and stores the results in getattr.size_array.  So,
820 * if you call this state machine directly, do not allocate space prior to calling it to avoid a
821 * nasty memory leak.
822*/
823static PINT_sm_action getattr_datafile_getattr_setup_msgpairarray(
824        struct PINT_smcb *smcb, job_status_s *js_p)
825{
826    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
827    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
828    int ret = -PVFS_EINVAL;
829    int i = 0;
830    PVFS_object_attr *attr = NULL;
831    PINT_sm_msgpair_state *msg_p = NULL;
832    uint64_t mirror_retry = (sm_p->getattr.attr.mask & PVFS_ATTR_META_MIRROR_DFILES);
833
834    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing getattr_datafile_getattr_setup_msgpairarray...\n");
835    gossip_debug(GOSSIP_MIRROR_DEBUG,"%s: Are we mirroring? %s\n",__func__
836                                    ,(mirror_retry ? "YES" : "NO"));
837    gossip_debug(GOSSIP_MIRROR_DEBUG,"%s: attr.mask:0x%08x \tmirror_retry:0x%08x\n"
838                ,__func__
839                ,sm_p->getattr.attr.mask
840                ,(unsigned int)mirror_retry);
841
842    js_p->error_code = 0;
843
844    attr = &sm_p->getattr.attr;
845    assert(attr);
846
847    PINT_msgpair_init(&sm_p->msgarray_op);
848    msg_p = &sm_p->msgarray_op.msgpair;
849
850    /*initialize the size_array, which will hold the size of each datahandle*/
851    PINT_SM_DATAFILE_SIZE_ARRAY_INIT(&getattr->size_array,attr->u.meta.dfile_count);
852
853    /* initialize mir_ctx_array: one context for each handle in the file */
854    getattr->mir_ctx_array = malloc(attr->u.meta.dfile_count *
855                                    sizeof(*getattr->mir_ctx_array));
856    if (!getattr->mir_ctx_array)
857    {
858        gossip_lerr("Unable to allocate mirror context array.\n");
859        js_p->error_code = -PVFS_ENOMEM;
860        return SM_ACTION_COMPLETE;
861    }
862    memset(getattr->mir_ctx_array,0,attr->u.meta.dfile_count *
863                                   sizeof(*getattr->mir_ctx_array));
864    getattr->mir_ctx_count = attr->u.meta.dfile_count;
865
866    for (i=0; i<getattr->mir_ctx_count; i++)
867    {
868       getattr->mir_ctx_array[i].original_datahandle = attr->u.meta.dfile_array[i];
869       getattr->mir_ctx_array[i].original_server_nr = i;
870    }
871
872    /* allocate handle array and populate */
873    PVFS_handle *handles = malloc(sizeof(PVFS_handle) * attr->u.meta.dfile_count);
874    if (!handles)
875    {
876        gossip_lerr("Unable to allocation local handles array.\n");
877        js_p->error_code = -PVFS_ENOMEM;
878        return SM_ACTION_COMPLETE;
879    }
880    memset(handles,0,sizeof(PVFS_handle) * attr->u.meta.dfile_count);
881    memcpy(handles,attr->u.meta.dfile_array,attr->u.meta.dfile_count *
882                                            sizeof(PVFS_handle));
883
884    /*allocate index-to-server array and populate*/
885    getattr->index_to_server = malloc(sizeof(uint32_t)*attr->u.meta.dfile_count);
886    if (!getattr->index_to_server)
887    {
888        gossip_lerr("Unable to allocate index-to-server array.\n");
889        js_p->error_code = -PVFS_ENOMEM;
890        return SM_ACTION_COMPLETE;
891    }
892    memset(getattr->index_to_server,0,sizeof(uint32_t)*attr->u.meta.dfile_count);
893    for (i=0; i<attr->u.meta.dfile_count; i++)
894    {
895        getattr->index_to_server[i] = (uint32_t)i;
896    }
897
898//START_PROFILER(getattr_prof);
899    PINT_SERVREQ_TREE_GET_FILE_SIZE_FILL(
900            msg_p->req,
901            *sm_p->cred_p,
902            sm_p->getattr.object_ref.fs_id,
903            0,
904            attr->u.meta.dfile_count,
905            handles,
906            (mirror_retry ? 1 : 0),
907            sm_p->hints);
908
909    msg_p->fs_id = sm_p->getattr.object_ref.fs_id;
910    msg_p->handle = handles[0];
911    msg_p->comp_fn = getattr_datafile_getattr_comp_fn;
912
913    ret = PINT_cached_config_map_to_server(
914        &msg_p->svr_addr, msg_p->handle, msg_p->fs_id);
915    if (ret)
916    {
917        gossip_lerr("Unable to map server address for this handle(%llu) and "
918                    "filesystem(%d)\n"
919                   ,llu(msg_p->handle)
920                   ,msg_p->fs_id);
921        js_p->error_code = ret;
922        return SM_ACTION_COMPLETE;
923    }
924
925    /* set retry flag based on mirroring option...if mirroring, we will handle
926     * retries from this machine; if not, msgpairarray will handle retries.
927    */
928    if (mirror_retry)
929    {
930        msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
931    }
932    else
933    {
934        msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
935    }
936
937
938    PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
939    return SM_ACTION_COMPLETE;
940}
941
942static int getattr_datafile_getattr_comp_fn(
943    void *v_p, struct PVFS_server_resp *resp_p, int index)
944{
945    PINT_smcb *smcb = v_p;
946    PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
947    PINT_sm_msgpair_state *msg = &(sm_p->msgarray_op.msgarray[index]);
948    struct PVFS_servreq_tree_get_file_size *tree =
949                                              &(msg->req.u.tree_get_file_size);
950
951    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
952    PINT_client_getattr_mirror_ctx *ctx = NULL;
953    uint32_t server_nr = 0;
954    int i = 0;
955
956    if (resp_p->status)
957    {   /* tree request had a problem */
958        return resp_p->status;
959    }
960
961    assert(resp_p->op == PVFS_SERV_TREE_GET_FILE_SIZE);
962
963
964    /* if we are mirroring, then we need to check the error code returned from
965     * each server. If an error is found, mirroring will try to get the size
966     * from a different server.  Below, we are marking which handles completed
967     * successfully, which tells mirroring NOT to retry them.
968    */
969    if ( getattr->attr.mask & PVFS_ATTR_META_MIRROR_DFILES) 
970    {
971       for (i=0; i<resp_p->u.tree_get_file_size.handle_count; i++)
972       {
973          if (resp_p->u.tree_get_file_size.error[i] != 0)
974          {   /* error retrieving size for this handle..we will retry it. */
975             continue;
976          }
977          server_nr = getattr->index_to_server[i];
978          sm_p->getattr.size_array[server_nr] = resp_p->u.tree_get_file_size.size[i];
979          ctx = &(getattr->mir_ctx_array[server_nr]);
980          ctx->msg_completed = 1;
981
982          /*For completed messages, update the size array with the file size
983           *just retrieved.
984          */
985          getattr->size_array[server_nr] = resp_p->u.tree_get_file_size.size[i];
986          gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: size[%d]:%lld \thandle:%llu\n"
987                                         ,__func__
988                                         ,i
989                                         ,llu(getattr->size_array[i])
990                                         ,llu(tree->handle_array[i]));
991       }/*end for*/
992    }
993    else
994    {
995     /* if we are NOT mirroring and an error is found for an individual handle,
996      * then we must invalidate the size array and return an error code.
997     */
998       for (i=0; i<resp_p->u.tree_get_file_size.handle_count; i++)
999       {
1000          gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: error[%d]:%d"
1001                                               "\tsize[%d]:%d\n"
1002                                           ,__func__
1003                                           ,i
1004                                           ,(int)resp_p->u.tree_get_file_size.error[i]
1005                                           ,i
1006                                           ,(int)resp_p->u.tree_get_file_size.size[i]);
1007
1008          if (resp_p->u.tree_get_file_size.error[i] != 0)
1009          {
1010              gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: error[%d] is %d\n"
1011                                               ,__func__
1012                                               ,i
1013                                               ,resp_p->u.tree_get_file_size.error[i]);
1014              memset(getattr->size_array,0,sizeof(*getattr->size_array));
1015              return (resp_p->u.tree_get_file_size.error[i]);
1016          }
1017
1018          getattr->size_array[i] = resp_p->u.tree_get_file_size.size[i];
1019       }/*end for*/
1020
1021    }/*end if*/
1022
1023    return(0);
1024}/*end getattr_datafile_getattr_comp_fn*/
1025
1026static PINT_sm_action getattr_datafile_getattr_retry(
1027        struct PINT_smcb *smcb, job_status_s *js_p)
1028{
1029    struct PINT_client_sm   *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1030    struct PVFS_object_attr *attr = &(sm_p->getattr.attr);
1031    PVFS_metafile_attr *meta = &(attr->u.meta);
1032    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
1033    PINT_client_getattr_mirror_ctx *ctx = NULL;
1034    PINT_sm_msgarray_op *mop = &(sm_p->msgarray_op);
1035    PINT_sm_msgpair_state *msg = &(mop->msgarray[0]);
1036    char *enc_req_bytes = NULL;
1037    struct PVFS_servreq_tree_get_file_size *tree =
1038                                              &(msg->req.u.tree_get_file_size);
1039    uint32_t retry_msg_count = 0;
1040    uint32_t index = 0;
1041    uint32_t copies = 0;
1042    uint32_t server_nr = 0;
1043    int i   = 0;
1044    int j   = 0;
1045    int k   = 0;
1046    int ret = 0;
1047
1048    gossip_debug(GOSSIP_CLIENT_DEBUG,
1049                 "(%p) getattr state: "
1050                 "getattr_datafile_getattr_retry\n", sm_p);
1051
1052    /*We only need to retry if we have mirrors; otherwise, msgpairarray
1053     *has already handled the retries.
1054    */
1055    if (!(attr->mask & PVFS_ATTR_META_MIRROR_DFILES)) 
1056    { 
1057       /*we are NOT mirroring.*/
1058       return SM_ACTION_COMPLETE;
1059    }
1060   
1061    /* How many handles need to be retried? */
1062    for (i=0; i<tree->num_data_files; i++)
1063    {
1064        server_nr = getattr->index_to_server[i];
1065        ctx = &(getattr->mir_ctx_array[server_nr]);
1066        if (ctx->msg_completed == 0)
1067            retry_msg_count++;
1068    }
1069
1070    /* no retries needed */
1071    if (retry_msg_count == 0)
1072    {
1073        return SM_ACTION_COMPLETE;
1074    }
1075
1076    /* do we have any retries available? */
1077    if (getattr->retry_count >= mop->params.retry_limit)
1078    {
1079        /* at this point, we have msgpairs that need to be retried, but we
1080         * we have met our retry limit.  so, we must invalidate the size array,
1081         * since we don't have all of the necessary sizes AND return an error.
1082        */
1083        memset(getattr->size_array,0,sizeof(*getattr->size_array) * getattr->size);
1084        js_p->error_code = (js_p->error_code ? js_p->error_code : -PVFS_ETIME);
1085        gossip_err("%s: Ran out of retries(%d)\n",__func__
1086                                                 ,getattr->retry_count);
1087        return SM_ACTION_COMPLETE;
1088    }
1089
1090    /*allocate temporary index-to-server array*/
1091    uint32_t *tmp_server_nr = malloc(sizeof(uint32_t) * retry_msg_count);
1092    if (!tmp_server_nr)
1093    {
1094        gossip_lerr("Unable to allocate temporary index-to-server array.\n");
1095        js_p->error_code = -PVFS_ENOMEM;
1096        return SM_ACTION_COMPLETE;
1097    }
1098    memset(tmp_server_nr,0,sizeof(uint32_t) * retry_msg_count);
1099
1100    /*allocate temporary handle array*/
1101    PVFS_handle *tmp_handles = malloc(sizeof(PVFS_handle) * retry_msg_count);
1102    if (!tmp_handles)
1103    {
1104        gossip_lerr("Unable to allocate temporary handle array.\n");
1105        js_p->error_code = -PVFS_ENOMEM;
1106        return SM_ACTION_COMPLETE;
1107    }
1108    memset(tmp_handles,0,sizeof(PVFS_handle) * retry_msg_count);
1109
1110    /* okay. let's setup new handles to retry.
1111    */
1112    for (i=0,j=0; i<tree->num_data_files; i++)
1113    {
1114        server_nr = getattr->index_to_server[i];
1115        ctx = &(getattr->mir_ctx_array[server_nr]);
1116     
1117        /* don't process completed messages */
1118        if (ctx->msg_completed)
1119           continue;
1120
1121        /* for incomplete messages, cleanup memory, if necessary */
1122        enc_req_bytes = (char *)&(msg->encoded_req);
1123        for (k=0; k<sizeof(msg->encoded_req); k++)
1124        {
1125           if (enc_req_bytes[k] != '\0')
1126           {
1127              PINT_encode_release(&(msg->encoded_req),PINT_ENCODE_REQ);
1128              break;
1129           }
1130        }/*end for*/
1131
1132        if (msg->encoded_resp_p)
1133        {
1134            BMI_memfree(msg->svr_addr
1135                       ,msg->encoded_resp_p
1136                       ,msg->max_resp_sz
1137                       ,BMI_RECV);
1138        }
1139       
1140        /* Should we use the original datahandle? */
1141        if (ctx->retry_original)
1142        {
1143            ctx->retry_original = 0;
1144            tmp_handles[j]   = ctx->original_datahandle;
1145            tmp_server_nr[j] = ctx->original_server_nr;
1146            j++;
1147            continue;
1148        }/*end retry_original*/
1149
1150        /* otherwise, get next mirrored handle.  note:  if a mirrored handle is
1151         * zero, then this means that the creation of this mirrored object
1152         * failed for its particular server.  in this case, get the next valid
1153         * handle.  as a last resort, retry the original handle.
1154        */
1155        copies = ctx->current_copies_count;
1156        for (;copies < meta->mirror_copies_count; copies++)
1157        {
1158            index = (copies*meta->dfile_count) + server_nr;
1159            if (meta->mirror_dfile_array[index] != 0)
1160            {  /* we have found a valid mirrored handle */
1161               tmp_handles[j]   = meta->mirror_dfile_array[index];
1162               tmp_server_nr[j] = server_nr;
1163               j++;
1164               break;
1165            }
1166        }
1167
1168        /* if we haven't found a valid mirrored handle, retry the original
1169         * datahandle.
1170        */
1171        if ( copies == meta->mirror_copies_count )
1172        {
1173           tmp_handles[j]   = ctx->original_datahandle;
1174           tmp_server_nr[j] = ctx->original_server_nr;
1175           j++;
1176           ctx->retry_original = 0;
1177           ctx->current_copies_count = 0;
1178           getattr->retry_count++;
1179           continue;
1180        }/*end if we have to use the original*/
1181
1182        /* otherwise, setup for the next retry event */
1183        ctx->current_copies_count++;
1184        if (ctx->current_copies_count == meta->mirror_copies_count)
1185        {
1186           ctx->current_copies_count = 0;
1187           ctx->retry_original = 1;
1188           getattr->retry_count++;
1189        }
1190    }/*end for each handle in the old request*/
1191
1192    /*replace values in old tree request*/
1193    free(tree->handle_array);
1194    tree->handle_array = tmp_handles;
1195    tree->num_data_files = retry_msg_count;
1196   
1197    /*replace values in old message request*/
1198    msg->handle = tmp_handles[0];
1199    msg->svr_addr=0;
1200    ret = PINT_cached_config_map_to_server(&msg->svr_addr
1201                                          ,msg->handle
1202                                          ,msg->fs_id);
1203    if (ret)
1204    {
1205        gossip_lerr("Unable to determine server address for handle(%llu) and "
1206                    "file system(%d).\n"
1207                   ,llu(msg->handle)
1208                   ,msg->fs_id);
1209        js_p->error_code = -PVFS_EINVAL;
1210        return SM_ACTION_COMPLETE;
1211    }
1212
1213    /*save index-to-server array*/
1214    free(getattr->index_to_server);
1215    getattr->index_to_server = tmp_server_nr;
1216
1217    /* Push the msgarray_op and jump to msgpairarray.sm */
1218    PINT_sm_push_frame(smcb,0,mop);
1219    js_p->error_code=GETATTR_IO_RETRY;
1220
1221    return SM_ACTION_COMPLETE;
1222} /*end datafile_getattr_retry*/
1223
1224
1225
1226
1227
1228
1229static PINT_sm_action getattr_datafile_getattr_cleanup(
1230        struct PINT_smcb *smcb, job_status_s *js_p)
1231{
1232    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1233//FINISH_PROFILER("getattr", getattr_prof, 1);
1234    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
1235
1236    /* cleanup handle_array created for the one tree request */
1237    PINT_sm_msgpair_state *msg_p = &(sm_p->msgarray_op.msgarray[0]);
1238    if (msg_p->req.u.tree_get_file_size.handle_array)
1239       free(msg_p->req.u.tree_get_file_size.handle_array);
1240   
1241    /* cleanup tree request */
1242    PINT_msgpairarray_destroy(&sm_p->msgarray_op);
1243
1244    /* cleanup memory that may have been used for mirrored retries.*/
1245    if (getattr->mir_ctx_array)
1246       free(getattr->mir_ctx_array);
1247    if (getattr->index_to_server)
1248       free(getattr->index_to_server);
1249
1250
1251    return SM_ACTION_COMPLETE;
1252}
1253
1254
1255static PINT_sm_action getattr_dirdata_getattr_setup_msgpairarray(
1256        struct PINT_smcb *smcb, job_status_s *js_p)
1257{
1258    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1259    int ret = -PVFS_EINVAL;
1260    int i = 0;
1261    PVFS_object_attr *attr = NULL;
1262    PINT_sm_msgpair_state *msg_p = NULL;
1263    int tmp_index, cur_index;
1264
1265    js_p->error_code = 0;
1266
1267    attr = &sm_p->getattr.attr;
1268    assert(attr);
1269
1270    assert(attr->mask & PVFS_ATTR_DIR_DISTDIR_ATTR);
1271
1272    /* initialize size_array, used to store the dirent_count of every dirdata handle */
1273    PINT_SM_DATAFILE_SIZE_ARRAY_INIT(&sm_p->getattr.size_array,
1274            attr->u.dir.dist_dir_attr.num_servers);
1275
1276    sm_p->getattr.active_dirdata_index = (int *)malloc(
1277            sizeof(*sm_p->getattr.active_dirdata_index) *
1278            attr->u.dir.dist_dir_attr.num_servers);
1279
1280    if(!sm_p->getattr.active_dirdata_index)
1281    {
1282        js_p->error_code = -PVFS_ENOMEM;
1283        return SM_ACTION_COMPLETE;
1284    }
1285
1286
1287    /* find out active dirdata handles */
1288    tmp_index = 0;
1289    for(i=0; i<attr->u.dir.dist_dir_attr.num_servers; i++)
1290    {
1291        if(PINT_is_dist_dir_bucket_active(&attr->u.dir.dist_dir_attr,
1292                    attr->u.dir.dist_dir_bitmap, i))
1293        {
1294            sm_p->getattr.active_dirdata_index[tmp_index] = i;
1295            tmp_index++;
1296        }
1297    }
1298    assert(tmp_index > 0);
1299
1300    /* initialize msgpair array */
1301    ret = PINT_msgpairarray_init(&sm_p->msgarray_op,
1302            tmp_index);
1303    if(ret != 0)
1304    {
1305        js_p->error_code = ret;
1306        return SM_ACTION_COMPLETE;
1307    }
1308
1309    /* prepare to post the getattr send/recv pairs for all dirdata*/
1310    foreach_msgpair(&sm_p->msgarray_op, msg_p, i)
1311    {
1312        cur_index = sm_p->getattr.active_dirdata_index[i];
1313
1314        gossip_debug(GOSSIP_CLIENT_DEBUG,
1315                     "getattr: posting dirdata getattr[%d] ([%d]:%lld,%d)\n",
1316                     i, cur_index,
1317                     llu(attr->u.dir.dirdata_handles[cur_index]),
1318                     sm_p->getattr.object_ref.fs_id);
1319
1320        PINT_SERVREQ_GETATTR_FILL(
1321            msg_p->req,
1322            *sm_p->cred_p,
1323            sm_p->getattr.object_ref.fs_id,
1324            attr->u.dir.dirdata_handles[cur_index],
1325            PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_DIRENT_COUNT, /* only interested in timestamp and dirent_count */
1326            sm_p->hints);
1327
1328        /* fill in msgpair structure components */
1329        msg_p->fs_id = sm_p->getattr.object_ref.fs_id;
1330        msg_p->handle = attr->u.dir.dirdata_handles[cur_index];
1331        msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
1332        msg_p->comp_fn = getattr_dirdata_getattr_comp_fn;
1333
1334        ret = PINT_cached_config_map_to_server(
1335            &msg_p->svr_addr, msg_p->handle, msg_p->fs_id);
1336
1337        if (ret)
1338        {
1339            gossip_err("Failed to map meta server address\n");
1340            js_p->error_code = ret;
1341            break;
1342        }
1343    }
1344
1345    PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
1346    return SM_ACTION_COMPLETE;
1347
1348}
1349
1350
1351static int getattr_dirdata_getattr_comp_fn(
1352    void *v_p, struct PVFS_server_resp *resp_p, int index)
1353{
1354    PINT_smcb *smcb = v_p;
1355    PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
1356    PVFS_object_attr *attr_p = NULL;
1357    PVFS_object_attr *resp_attr_p = NULL;
1358    int cur_index;
1359
1360    gossip_debug(GOSSIP_GETATTR_DEBUG,
1361                 "getattr_dirdata_getattr_comp_fn called\n");
1362    gossip_debug(GOSSIP_CLIENT_DEBUG, "dirdata getattr[%d] got response %d\n",
1363                 index, resp_p->status);
1364
1365    if (resp_p->status)
1366    {   
1367        return resp_p->status;
1368    }
1369
1370    assert(resp_p->op == PVFS_SERV_GETATTR);
1371
1372    attr_p = &sm_p->getattr.attr;
1373    resp_attr_p = &resp_p->u.getattr.attr;
1374    cur_index = sm_p->getattr.active_dirdata_index[index];
1375
1376    assert(resp_attr_p->objtype == PVFS_TYPE_DIRDATA);
1377    /* also assert timestamp? */
1378    assert(resp_attr_p->mask & PVFS_ATTR_DIR_DIRENT_COUNT);
1379    assert( (cur_index >= 0) &&
1380            (cur_index < attr_p->u.dir.dist_dir_attr.num_servers));
1381
1382    gossip_debug(GOSSIP_CLIENT_DEBUG, "dirdata getattr[%d] (#[%d]) returns attrs: "
1383            "[dirent_count=%llu, atime = %llu, mtime = %llu, ctime = %llu]\n",
1384            index, cur_index,
1385            llu(resp_attr_p->u.dir.dirent_count),
1386            llu(resp_attr_p->atime),
1387            llu(resp_attr_p->mtime),
1388            llu(resp_attr_p->ctime));
1389
1390    /* update timestamp and dirent_count */
1391    sm_p->getattr.size_array[cur_index] = resp_attr_p->u.dir.dirent_count;
1392    attr_p->u.dir.dirent_count += resp_attr_p->u.dir.dirent_count;
1393    if(attr_p->atime < resp_attr_p->atime)
1394    {
1395        attr_p->atime = resp_attr_p->atime;
1396    }
1397    if(attr_p->ctime < resp_attr_p->ctime)
1398    {
1399        attr_p->ctime = resp_attr_p->ctime;
1400    }
1401    if(attr_p->mtime < resp_attr_p->mtime)
1402    {
1403        attr_p->mtime = resp_attr_p->mtime;
1404    }
1405
1406    gossip_debug(GOSSIP_CLIENT_DEBUG, "updated directory attrs: "
1407            "[dirent_count=%llu, atime = %llu, mtime = %llu, ctime = %llu]\n",
1408            llu(attr_p->u.dir.dirent_count),
1409            llu(attr_p->atime),
1410            llu(attr_p->mtime),
1411            llu(attr_p->ctime));
1412
1413    return 0;
1414}
1415
1416
1417static PINT_sm_action getattr_dirdata_getattr_cleanup(
1418        struct PINT_smcb *smcb, job_status_s *js_p)
1419{
1420    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1421   
1422    /* cleanup tree request */
1423    PINT_msgpairarray_destroy(&sm_p->msgarray_op);
1424
1425    if(sm_p->getattr.active_dirdata_index)
1426    {
1427        free(sm_p->getattr.active_dirdata_index);
1428    }
1429
1430    return SM_ACTION_COMPLETE;
1431}
1432
1433
1434
1435
1436static PINT_sm_action getattr_acache_insert(
1437        struct PINT_smcb *smcb, job_status_s *js_p)
1438{   
1439    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1440    PVFS_size* tmp_size = NULL;
1441
1442    if( sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE ||
1443        sm_p->getattr.attr.objtype == PVFS_TYPE_DIRECTORY ||
1444        sm_p->getattr.attr.objtype == PVFS_TYPE_SYMLINK )
1445    {
1446        /* see if we have a size value to cache */
1447        if (sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE &&
1448            sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
1449        {                                           
1450            if(!(sm_p->getattr.attr.mask & PVFS_ATTR_META_UNSTUFFED))
1451            {
1452                /* stuffed file case */
1453                sm_p->getattr.size = sm_p->getattr.attr.u.meta.stuffed_size;
1454                tmp_size = &sm_p->getattr.size;
1455                gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr_acache_insert "
1456                  "calculated stuffed logical size of %lld\n", lld(*tmp_size));
1457            }
1458            else
1459            {
1460                /* compute size as requested */
1461                assert(sm_p->getattr.attr.u.meta.dist);
1462                assert(sm_p->getattr.attr.u.meta.dist->methods &&
1463                       sm_p->getattr.attr.u.meta.dist->methods->logical_file_size);
1464
1465                sm_p->getattr.size =
1466                    (sm_p->getattr.attr.u.meta.dist->methods->logical_file_size)(
1467                    sm_p->getattr.attr.u.meta.dist->params,
1468                    sm_p->getattr.attr.u.meta.dfile_count,
1469                    sm_p->getattr.size_array);
1470
1471                tmp_size = &sm_p->getattr.size;
1472                gossip_debug(GOSSIP_GETATTR_DEBUG,"getattr_acache_insert calculated"
1473                                                  " unstuffed logical size of %lld\n"
1474                                                 , lld(*tmp_size));
1475            }
1476        }
1477
1478        PINT_acache_update(sm_p->getattr.object_ref,
1479            &sm_p->getattr.attr,
1480            tmp_size);
1481
1482        gossip_debug(GOSSIP_CLIENT_DEBUG, "trying to add object "
1483                     "reference to acache\n");
1484    }
1485
1486    return SM_ACTION_COMPLETE;
1487}
1488
1489static PINT_sm_action getattr_cleanup(
1490        struct PINT_smcb *smcb, job_status_s *js_p)
1491{
1492    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1493    PINT_sm_getattr_state *getattr = &(sm_p->getattr);
1494
1495    gossip_debug(GOSSIP_CLIENT_DEBUG,
1496                 "(%p) getattr state: getattr_cleanup\n", sm_p);
1497
1498    gossip_debug(GOSSIP_GETATTR_DEBUG
1499                ,"%s: js_p->error_code:%d \tgetattr->attr.mask:0x%08x\n"
1500                ,__func__
1501                ,js_p->error_code
1502                ,getattr->attr.mask);
1503
1504    sm_p->error_code = js_p->error_code;
1505
1506    /* cleanup size array if not asked to keep; is allocated if datafile sizes or dirent_count are retrieved */
1507    /* keep_size_array is initialized outside the state machine, now only used in sys-readdir.sm */
1508    if (!getattr->keep_size_array && getattr->size_array)
1509    {
1510        gossip_debug(GOSSIP_CLIENT_DEBUG,
1511                 "getattr_cleanup: freeint getattr->size_array. \n");
1512        free(getattr->size_array);
1513        getattr->size_array = NULL;
1514    }
1515
1516    /* cleanup getattr when an error occurs */
1517    if (js_p->error_code)
1518    {
1519      if (getattr->attr.mask & PVFS_ATTR_META_DFILES)
1520      {
1521         if (getattr->attr.u.meta.dfile_array)
1522            free(getattr->attr.u.meta.dfile_array);
1523      }
1524
1525      if (getattr->attr.mask & PVFS_ATTR_META_MIRROR_DFILES)
1526      {
1527         if (getattr->attr.u.meta.mirror_dfile_array)
1528            free(getattr->attr.u.meta.mirror_dfile_array);
1529      }
1530
1531      if (getattr->attr.mask & PVFS_ATTR_META_DIST)
1532      {
1533         PINT_dist_free(getattr->attr.u.meta.dist);
1534      }
1535    }/*end if error*/
1536
1537    return SM_ACTION_COMPLETE;
1538}
1539
1540static PINT_sm_action getattr_set_sys_response(
1541        struct PINT_smcb *smcb, job_status_s *js_p)
1542{
1543    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1544    PVFS_sysresp_getattr * sysresp = NULL;
1545    PVFS_object_attr *attr = NULL;
1546
1547    if(js_p->error_code != 0)
1548    {
1549        PINT_SET_OP_COMPLETE;
1550        return SM_ACTION_TERMINATE;
1551    }
1552
1553    attr = &sm_p->getattr.attr;
1554    assert(attr);
1555   
1556    /* If we get to this state action,
1557     * the getattr state machine was invoked, so
1558     * we can assume that one of the PVFS_[i]sys_getattr functions
1559     * was called, and the response field must be filled in for the
1560     * user.
1561     */
1562
1563    sysresp = sm_p->u.getattr.getattr_resp_p;
1564   
1565    /*
1566     * if we retrieved a symlink target, copy it for the caller; this
1567     * target path will be handed all the way back up to the caller via
1568     * the PVFS_sys_attr object.  The caller of PVFS_[i]sys_getattr
1569     * must free it.
1570     */
1571    if(attr->objtype == PVFS_TYPE_SYMLINK &&
1572       attr->mask & PVFS_ATTR_SYMLNK_TARGET)
1573    {
1574        assert(attr->u.sym.target_path_len > 0);
1575        assert(attr->u.sym.target_path);
1576
1577        sysresp->attr.link_target = strdup(attr->u.sym.target_path);
1578        if (!sysresp->attr.link_target)
1579        {
1580           js_p->error_code = -PVFS_ENOMEM;
1581           PINT_SET_OP_COMPLETE;
1582           return SM_ACTION_TERMINATE;
1583        }
1584    }
1585
1586    if(attr->objtype == PVFS_TYPE_METAFILE)
1587    {
1588       /* Copy if there are any special object specific flags */
1589       sysresp->attr.flags = attr->u.meta.hint.flags;
1590       /* special case for when users ask for dfile count */
1591       if (sm_p->getattr.req_attrmask & PVFS_ATTR_META_DFILES)
1592       {
1593           sysresp->attr.dfile_count = attr->u.meta.dfile_count;
1594       }
1595       if (sm_p->getattr.req_attrmask & PVFS_ATTR_META_MIRROR_DFILES)
1596       {
1597         sysresp->attr.mirror_copies_count = attr->u.meta.mirror_copies_count;
1598       }
1599    }
1600    if (attr->objtype == PVFS_TYPE_DIRECTORY)
1601    {
1602        gossip_debug(GOSSIP_CLIENT_DEBUG, "dfile_count: %d\n",
1603            attr->u.dir.hint.dfile_count);
1604        gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_len = %d, "
1605            "dist_params_len = %d\n",
1606            attr->u.dir.hint.dist_name_len, attr->u.dir.hint.dist_params_len);
1607        sysresp->attr.dfile_count = attr->u.dir.hint.dfile_count;
1608
1609
1610       if (sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_DISTDIR_ATTR)
1611       {
1612           sysresp->attr.dirdata_count =
1613               attr->u.dir.dist_dir_attr.num_servers;
1614       }
1615
1616        /*
1617         * If we retrieved any extended attributes for the directory
1618         * in question, the caller's responsibility to free it up
1619         */
1620        if (attr->u.dir.hint.dist_name_len > 0 &&
1621            (sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_HINT))
1622        {
1623            sysresp->attr.dist_name = strdup(attr->u.dir.hint.dist_name);
1624            if (!sysresp->attr.dist_name)
1625            {
1626                js_p->error_code = -PVFS_ENOMEM;
1627                PINT_SET_OP_COMPLETE;
1628                return SM_ACTION_TERMINATE;
1629            }
1630            gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_hint: %s\n"
1631                                            , sysresp->attr.dist_name);
1632        }
1633        if (attr->u.dir.hint.dist_params_len > 0 &&
1634            (sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_HINT))
1635        {
1636            sysresp->attr.dist_params = strdup(attr->u.dir.hint.dist_params);
1637            if (!sysresp->attr.dist_params)
1638            {
1639                free(sysresp->attr.dist_name);
1640                sysresp->attr.dist_name = NULL;
1641                js_p->error_code = -PVFS_ENOMEM;
1642                PINT_SET_OP_COMPLETE;
1643                return SM_ACTION_TERMINATE;
1644            }
1645            gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_params: %s\n"
1646                                            , sysresp->attr.dist_params);
1647        }
1648    }
1649
1650    /* copy outgoing sys_attr fields from returned object_attr */
1651    sysresp->attr.owner = attr->owner;
1652    sysresp->attr.group = attr->group;
1653    sysresp->attr.perms = attr->perms;
1654    sysresp->attr.atime = attr->atime;
1655    sysresp->attr.mtime = attr->mtime;
1656    sysresp->attr.ctime = attr->ctime;
1657    sysresp->attr.mask  = PVFS_util_object_to_sys_attr_mask(attr->mask);
1658    sysresp->attr.size  = 0;
1659    sysresp->attr.objtype = attr->objtype;
1660
1661    if (js_p->error_code == 0)
1662    {
1663        /* convert outgoing attribute mask based on what we got */
1664        sysresp->attr.mask = PVFS_util_object_to_sys_attr_mask(
1665            sm_p->getattr.attr.mask);
1666
1667       if (sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
1668        {
1669            if( attr->objtype == PVFS_TYPE_DATAFILE )
1670            {
1671                sysresp->attr.size = attr->u.data.size;
1672            }
1673            else
1674            {
1675                sysresp->attr.size = sm_p->getattr.size;
1676            }
1677
1678            sysresp->attr.mask |= PVFS_ATTR_SYS_SIZE;
1679        }
1680
1681        if(attr->mask & PVFS_ATTR_META_DIST)
1682        {
1683            /* we have enough information to set a block size */
1684            sysresp->attr.blksize = attr->u.meta.dist->methods->get_blksize(
1685                attr->u.meta.dist->params);
1686            sysresp->attr.mask |= PVFS_ATTR_SYS_BLKSIZE;
1687        }
1688
1689        /* if this is a symlink, add the link target */
1690        if (sm_p->getattr.req_attrmask & PVFS_ATTR_SYMLNK_TARGET)
1691        {
1692            sysresp->attr.mask |= PVFS_ATTR_SYS_LNK_TARGET;
1693        }
1694
1695        if(sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_DIRENT_COUNT)
1696        {
1697            sysresp->attr.dirent_count = attr->u.dir.dirent_count;
1698            sysresp->attr.mask |= PVFS_ATTR_SYS_DIRENT_COUNT;
1699        }
1700
1701        if (sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_DISTDIR_ATTR)
1702        {
1703/* TODO: Do we need this? */
1704        }
1705    }
1706    else
1707    {
1708        /* in case of failure, blank out response */
1709        memset(sm_p->u.getattr.getattr_resp_p,
1710               0, sizeof(PVFS_sysresp_getattr));
1711    }
1712
1713    PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
1714
1715    PINT_SET_OP_COMPLETE;
1716    return SM_ACTION_TERMINATE;
1717}
1718
1719/*
1720 * Local variables:
1721 *  mode: c
1722 *  c-indent-level: 4
1723 *  c-basic-offset: 4
1724 * End:
1725 *
1726 * vim: ft=c ts=8 sts=4 sw=4 expandtab
1727 */
Note: See TracBrowser for help on using the browser.