root/branches/Orange-Elaine-Distr-Dir-Branch/src/server/get-attr.sm @ 8478

Revision 8478, 63.6 KB (checked in by elaine, 3 years ago)

*** empty log message ***

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 *
6 * Changes by Acxiom Corporation to add dirent_count field to attributes
7 * Copyright © Acxiom Corporation, 2005.
8 */
9
10/* pvfs2_get_attr_sm
11 *
12 * This state machine handles incoming server getattr operations.  These
13 * are the operations sent by PVFS_sys_getattr() among others.
14 *
15 * The pvfs2_prelude_sm is responsible for reading the actual metadata
16 * to begin with, because it does this as part of the permission checking
17 * process.
18 */
19
20#include <string.h>
21#include <assert.h>
22
23#include "server-config.h"
24#include "pvfs2-server.h"
25#include "pvfs2-attr.h"
26#include "pvfs2-types.h"
27#include "pvfs2-types-debug.h"
28#include "pvfs2-util.h"
29#include "pint-util.h"
30#include "pvfs2-internal.h"
31#include "pint-cached-config.h"
32
33static uint64_t UINT64_HIGH = 0xffffffffffffffffLL;
34
35PINT_server_trove_keys_s Trove_Special_Keys[] =
36{
37    {"user.pvfs2.dist_name"    , SPECIAL_DIST_NAME_KEYLEN},
38    {"user.pvfs2.dist_params"  , SPECIAL_DIST_PARAMS_KEYLEN},
39    {"user.pvfs2.num_dfiles"   , SPECIAL_NUM_DFILES_KEYLEN},
40    {"user.pvfs2.meta_hint"    , SPECIAL_METAFILE_HINT_KEYLEN},
41    {"user.pvfs2.mirror.copies", SPECIAL_MIRROR_COPIES_KEYLEN},
42    {"user.pvfs2.mirror.handles", SPECIAL_MIRROR_HANDLES_KEYLEN},
43    {"user.pvfs2.mirror.status" , SPECIAL_MIRROR_STATUS_KEYLEN},
44};
45
46enum
47{
48    STATE_METAFILE  = 7,
49    STATE_SYMLINK   = 9,
50    STATE_DIR       = 10,
51    STATE_DIR_HINT  = 11,
52    STATE_DONE      = 12,
53    SKIP_NEXT_STATE = 13,
54};
55
56static void free_nested_getattr_data(struct PINT_server_op *s_op);
57
58%%
59
60nested machine pvfs2_get_attr_work_sm
61{
62    state verify_attribs
63    {
64        run getattr_verify_attribs;
65        STATE_SYMLINK => read_symlink_target;
66        STATE_METAFILE => read_metafile_hint;
67        STATE_DIR => get_num_dirdata_handles;
68        default => setup_resp;
69    }
70
71    state read_symlink_target
72    {
73        run getattr_read_symlink_target;
74        default => setup_resp;
75    }
76
77    state read_metafile_hint
78    {
79        run getattr_read_metafile_hint;
80        default => interpret_metafile_hint;
81    }
82
83    state interpret_metafile_hint
84    {
85        run getattr_interpret_metafile_hint;
86        STATE_METAFILE => read_metafile_datafile_handles_if_required;
87        default => setup_resp;
88    }
89
90    state read_metafile_datafile_handles_if_required
91    {
92        run getattr_read_metafile_datafile_handles_if_required;
93        success => datafile_handles_safety_check;
94        default => setup_resp;
95    }
96
97    state datafile_handles_safety_check
98    {
99        run getattr_datafile_handles_safety_check;
100        success => read_mirrored_copies_count_if_required;
101        default => setup_resp;
102    }
103
104    state read_mirrored_copies_count_if_required
105    {
106        run getattr_read_mirrored_copies_count_if_required;
107        SKIP_NEXT_STATE => read_metafile_distribution_if_required;
108        default => read_mirrored_handles_if_required;
109    }
110
111    state read_mirrored_handles_if_required
112    {
113        run getattr_read_mirrored_handles_if_required;
114        SKIP_NEXT_STATE => read_metafile_distribution_if_required;
115        default => mirrored_handles_safety_check;
116    }
117 
118    state mirrored_handles_safety_check
119    {
120        run getattr_mirrored_handles_safety_check;
121        success => read_metafile_distribution_if_required;
122        default => setup_resp;
123    }
124
125    state read_metafile_distribution_if_required
126    {
127        run getattr_read_metafile_distribution_if_required;
128        default => interpret_metafile_distribution;
129    }
130
131    state interpret_metafile_distribution
132    {
133        run interpret_metafile_distribution;
134        success => detect_stuffed;
135        default => setup_resp;
136    }
137
138    state detect_stuffed
139    {
140        run getattr_detect_stuffed;
141        default => read_stuffed_size;
142    }
143
144    state read_stuffed_size
145    {
146        run getattr_read_stuffed_size;
147        success => interpret_stuffed_size;
148        default => setup_resp;
149    }
150
151    state interpret_stuffed_size
152    {
153        run getattr_interpret_stuffed_size;
154        default => setup_resp;
155    }
156
157    state get_num_dirdata_handles
158    {
159        run getattr_get_num_dirdata_handles;
160        default => get_dirdata_handles;
161/*
162        success => get_dirdata_handles;
163        default => setup_resp;
164*/
165    }
166
167    state get_dirdata_handles
168    {
169        run getattr_get_dirdata_handles;
170        success => get_dirent_count;
171        default => setup_resp;
172    }
173
174    state get_dirent_count
175    {
176        run getattr_get_dirent_count;
177        STATE_DIR_HINT => get_dir_hint;
178        default => interpret_dirent_count;
179    }
180
181    state interpret_dirent_count
182    {
183        run getattr_interpret_dirent_count;
184        default => get_dir_hint;
185    }
186
187    state get_dir_hint
188    {
189        run getattr_get_dir_hint;
190        STATE_DONE => setup_resp;
191        default => interpret_dir_hint;
192    }
193
194    state interpret_dir_hint
195    {
196        run getattr_interpret_dir_hint;
197        default => setup_resp;
198    }
199
200    state setup_resp
201    {
202        run getattr_setup_resp;
203        default => return;
204    }
205}
206
207nested machine pvfs2_get_attr_with_prelude_sm
208{
209    state init
210    {
211        run getattr_with_prelude_init;
212        default => prelude;
213    }
214
215    state prelude
216    {
217        jump pvfs2_prelude_sm;
218        success => setup_op;
219        default => return;
220    }
221
222    state setup_op
223    {
224        run getattr_setup_op;
225        default => do_work;
226    }
227
228    state do_work
229    {
230        jump pvfs2_get_attr_work_sm;
231        default => return;
232    }
233}
234
235machine pvfs2_get_attr_sm
236{
237    state work
238    {
239        jump pvfs2_get_attr_with_prelude_sm;
240        default => final_response;
241    }
242
243    state final_response
244    {
245        jump pvfs2_final_response_sm;
246        default => cleanup;
247    }
248
249    state cleanup
250    {
251        run getattr_cleanup;
252        default => terminate;
253    }
254}
255
256%%
257
258/* getattr_verify_attribs()
259 *
260 * We initialize the attribute mask that will be returned in this
261 * function.  This mask can be augmented in some of the other states.
262 */
263static PINT_sm_action getattr_verify_attribs(
264        struct PINT_smcb *smcb, job_status_s *js_p)
265{
266    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
267    PVFS_object_attr *resp_attr = NULL;
268
269    js_p->error_code = 0;
270
271    /*
272      explicitly copy basic attributes structure (read in from the
273      prelude.sm for the matching dspace) into response to be sent
274      back to the client.  this is mostly for readability here to be
275      sure we know which fields are valid in the response at this
276      point.
277    */
278    resp_attr = &s_op->resp.u.getattr.attr;
279    memset(resp_attr, 0, sizeof(PVFS_object_attr));
280
281    resp_attr->owner = s_op->attr.owner;
282    resp_attr->group = s_op->attr.group;
283    resp_attr->perms = s_op->attr.perms;
284    resp_attr->atime = s_op->attr.atime;
285
286    resp_attr->mtime = PINT_util_mkversion_time(s_op->attr.mtime);
287    if (resp_attr->mtime == 0)
288    {
289        /*
290          this is a compatibility hack to allow existing storage
291          spaces to be automagically converted to this versioned time
292          on-disk format slowly over time and doing the right thing in
293          the meantime
294        */
295        resp_attr->mtime = s_op->attr.mtime;
296
297        gossip_debug(GOSSIP_GETATTR_DEBUG, " No version found!  Using "
298                     "mtime %llu\n", llu(resp_attr->mtime));
299    }
300    else
301    {
302        gossip_debug(
303            GOSSIP_GETATTR_DEBUG, " VERSION is %llu, mtime is %llu\n",
304            llu(s_op->attr.mtime), llu(resp_attr->mtime));
305    }
306
307    resp_attr->ctime = s_op->attr.ctime;
308    resp_attr->mask = s_op->attr.mask;
309    resp_attr->objtype = s_op->attr.objtype;
310    resp_attr->u.meta.dfile_count = s_op->attr.u.meta.dfile_count;
311    resp_attr->u.meta.dist_size = s_op->attr.u.meta.dist_size;
312
313#if 0
314    gossip_debug(
315        GOSSIP_GETATTR_DEBUG,
316        "+  _DSPACE_ retrieved attrs: [owner = %d, group = %d\n\t"
317        "perms = %o, type = %d, atime = %llu, mtime = %llu\n\t"
318        "ctime = %llu, dfile_count = %d, dist_size = %d]\n",
319        resp_attr->owner, resp_attr->group, resp_attr->perms,
320        resp_attr->objtype, llu(resp_attr->atime),
321        llu(resp_attr->mtime), llu(resp_attr->ctime),
322        (int)resp_attr->u.meta.dfile_count,
323        (int)resp_attr->u.meta.dist_size);
324#endif
325
326    /*
327      weed out the attr mask of the response based on what the client
328      request asked for.  also, check if we need to retrieve more
329      information before returning the response to the client (by
330      guiding the state machine to get it).
331
332      we can safely do this now that we have the type of the object
333      (read in from the dspace, not stored in the resp_attr), and we
334      have the original client request attr mask
335      (s_op->u.getattr.attrmask).
336    */
337    switch(resp_attr->objtype)
338    {
339        case PVFS_TYPE_METAFILE:
340            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: metafile\n");
341            gossip_debug(GOSSIP_GETATTR_DEBUG,
342                         "  Req handle %llu refers to a metafile\n",
343                         llu(s_op->u.getattr.handle));
344
345            if (s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES)
346            {
347                gossip_debug(GOSSIP_GETATTR_DEBUG,
348                             " dspace has dfile_count of %d\n",
349                             resp_attr->u.meta.dfile_count);
350                resp_attr->mask |= PVFS_ATTR_META_DFILES;
351            }
352            else
353            {
354                gossip_debug(GOSSIP_GETATTR_DEBUG, " client doesn't want "
355                             "dfile info, clearing response attr mask\n");
356                resp_attr->mask &= ~PVFS_ATTR_META_DFILES;
357            }
358
359            if (s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST)
360            {
361                gossip_debug(GOSSIP_GETATTR_DEBUG,
362                             " dspace has dist size of %d\n",
363                             resp_attr->u.meta.dist_size);
364
365                resp_attr->mask |= PVFS_ATTR_META_DIST;
366            }
367            else
368            {
369                gossip_debug(GOSSIP_GETATTR_DEBUG, " client doesn't want "
370                             "dist info, clearing response attr mask\n");
371
372                resp_attr->mask &= ~PVFS_ATTR_META_DIST;
373            }
374
375            if (s_op->u.getattr.attrmask & PVFS_ATTR_META_MIRROR_DFILES)
376            {
377               gossip_debug(GOSSIP_GETATTR_DEBUG,"client wants mirrored "
378                                                 "handles.\n");
379               resp_attr->mask |= PVFS_ATTR_META_MIRROR_DFILES;
380               resp_attr->u.meta.mirror_copies_count = 0;
381               resp_attr->u.meta.mirror_dfile_array  = NULL;
382            }
383            else
384            {
385               gossip_debug(GOSSIP_GETATTR_DEBUG,"client doesn't want "
386                                                 "mirrored handles.\n");
387               resp_attr->mask &= ~(PVFS_ATTR_META_MIRROR_DFILES);
388            }
389            js_p->error_code = STATE_METAFILE;
390            break;
391        case PVFS_TYPE_DATAFILE:
392            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: datafile\n");
393            /*
394              note: the prelude already retrieved the size for us, so
395              there's no special action that needs to be taken if we have
396              a datafile here (other than adjusting our mask to include
397              the data information and copying the retrieved size from the
398              ds_attribute the prelude used)
399            */
400            resp_attr->u.data.size = s_op->ds_attr.u.datafile.b_size;
401            resp_attr->mask |= PVFS_ATTR_DATA_ALL;
402
403            gossip_debug(GOSSIP_GETATTR_DEBUG, "  handle %llu refers to "
404                         "a datafile (size = %lld).\n",
405                         llu(s_op->u.getattr.handle),
406                         lld(resp_attr->u.data.size));
407            break;
408        case PVFS_TYPE_DIRECTORY:
409            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: directory\n");
410            if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_DIRENT_COUNT)
411            {
412                gossip_debug(GOSSIP_GETATTR_DEBUG,
413                             " getattr: dirent_count needed.\n");
414                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
415                resp_attr->mask |= PVFS_ATTR_DIR_DIRENT_COUNT;
416                js_p->error_code = STATE_DIR;
417            }
418            else
419            {
420                gossip_debug(GOSSIP_GETATTR_DEBUG,
421                             " getattr: dirent_count not needed.\n");
422                js_p->error_code = 0;
423                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
424            }
425            if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_DIRENT_FILES)
426            {
427                gossip_debug(GOSSIP_GETATTR_DEBUG,
428                             " getattr: dirent_handles needed.\n");
429                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
430                resp_attr->mask |= PVFS_ATTR_DIR_DIRENT_FILES;
431                js_p->error_code = STATE_DIR;
432            }
433            else
434            {
435                gossip_debug(GOSSIP_GETATTR_DEBUG,
436                             " getattr: dirent_handles not needed.\n");
437                js_p->error_code = 0;
438                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
439            }
440            if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_HINT)
441            {
442                gossip_debug(GOSSIP_GETATTR_DEBUG,
443                            " getattr: dfile_count needed.\n");
444                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
445                resp_attr->mask |= PVFS_ATTR_DIR_HINT;
446                js_p->error_code = STATE_DIR;
447            }
448            else
449            {
450                gossip_debug(GOSSIP_GETATTR_DEBUG,
451                            " getattr: dfile_count not needed\n");
452                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
453            }
454            break;
455        case PVFS_TYPE_DIRDATA:
456            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: dirdata\n");
457            gossip_debug(
458                GOSSIP_GETATTR_DEBUG, "  handle %llu refers to "
459                "a dirdata object. doing nothing special\n",
460                llu(s_op->u.getattr.handle));
461            assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
462            break;
463        case PVFS_TYPE_SYMLINK:
464            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: symlink\n");
465            gossip_debug(
466                GOSSIP_GETATTR_DEBUG, "  handle %llu refers to a symlink.\n",
467                llu(s_op->u.getattr.handle));
468
469            /*
470              we'll definitely have to fetch the symlink target in this
471              case, as the prelude will never retrieve it for us
472            */
473            js_p->error_code = STATE_SYMLINK;
474            break;
475        case PVFS_TYPE_INTERNAL:
476            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: symlink\n");
477            /* nothing interesting to add; this is meaningless to a client */
478            break;
479        default:
480            /* if we don't understand the object type, then it probably indicates
481             * a bug or some data corruption.  All trove objects should have a
482             * type set.
483             */
484            gossip_err(
485                "Error: got unknown type when verifying attributes for "
486                "handle %llu.\n",
487                llu(s_op->u.getattr.handle));
488            js_p->error_code = -PVFS_ENXIO;
489            break;
490    }
491
492    return SM_ACTION_COMPLETE;
493}
494
495static PINT_sm_action getattr_read_symlink_target(
496        struct PINT_smcb *smcb, job_status_s *js_p)
497{
498    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
499    int ret;
500    job_id_t i;
501
502    /* if we don't need to fill in the symlink target, skip it */
503    if (!(s_op->u.getattr.attrmask & PVFS_ATTR_SYMLNK_TARGET))
504    {
505        gossip_debug(GOSSIP_GETATTR_DEBUG, "skipping symlink target read\n");
506        js_p->error_code = 0;
507        return SM_ACTION_COMPLETE;
508    }
509   
510    s_op->key.buffer    = Trove_Common_Keys[SYMLINK_TARGET_KEY].key;
511    s_op->key.buffer_sz = Trove_Common_Keys[SYMLINK_TARGET_KEY].size;
512
513    /*
514      optimistically add mask value to indicate the symlink target is
515      filled (error_code is checked in next state)
516    */
517    s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_SYMLNK_TARGET;
518
519    s_op->resp.u.getattr.attr.u.sym.target_path_len = PVFS_NAME_MAX;
520    s_op->resp.u.getattr.attr.u.sym.target_path =
521        malloc(s_op->resp.u.getattr.attr.u.sym.target_path_len);
522    if (!s_op->resp.u.getattr.attr.u.sym.target_path)
523    {
524        js_p->error_code = -PVFS_ENOMEM;
525        return SM_ACTION_COMPLETE;
526    }
527
528    if(s_op->free_val)
529    {
530        free(s_op->val.buffer);
531    }
532    s_op->val.buffer = s_op->resp.u.getattr.attr.u.sym.target_path;
533    s_op->val.buffer_sz = s_op->resp.u.getattr.attr.u.sym.target_path_len;
534    /* this will get cleaned up with attr structure */
535    s_op->free_val = 0;
536
537    ret = job_trove_keyval_read(
538        s_op->u.getattr.fs_id, s_op->u.getattr.handle,
539        &s_op->key, &s_op->val,
540        0,
541        NULL, smcb, 0, js_p,
542        &i, server_job_context, s_op->req->hints);
543
544
545    return ret;
546}
547
548static PINT_sm_action getattr_interpret_metafile_hint(
549    PINT_smcb *smcb, job_status_s *js_p)
550{
551    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
552    PVFS_object_attr *resp_attr = NULL;
553    PVFS_metafile_attr *meta = &(s_op->resp.u.getattr.attr.u.meta);
554    resp_attr = &s_op->resp.u.getattr.attr;
555
556    assert(resp_attr->objtype == PVFS_TYPE_METAFILE);
557
558    if (js_p->error_code == 0 || js_p->error_code == -TROVE_ENOENT)
559    {
560        if (js_p->error_code == 0)
561        {
562            memcpy(&(meta->hint), s_op->val.buffer, sizeof(meta->hint));
563        }
564        if ((resp_attr->mask & PVFS_ATTR_META_DFILES) ||
565            (resp_attr->mask & PVFS_ATTR_META_DIST)   ||
566            (resp_attr->mask & PVFS_ATTR_META_MIRROR_DFILES))
567        {
568            gossip_debug(GOSSIP_GETATTR_DEBUG, " * client wants extra "
569                         "meta info, about to retrieve it now\n");
570            js_p->error_code = STATE_METAFILE;
571            if ( (resp_attr->mask  & PVFS_ATTR_META_MIRROR_DFILES) &&
572                !(meta->hint.flags & PVFS_MIRROR_FL) )
573                  resp_attr->mask &= ~(PVFS_ATTR_META_MIRROR_DFILES);
574        }
575        else
576        {
577            gossip_debug(GOSSIP_GETATTR_DEBUG, " * client doesn't want "
578                         "extra meta info, preparing response now\n");
579            js_p->error_code = 0;
580        }
581    } else {
582        /*If we hit an error the DIST & DFILES are no longer valid*/
583        resp_attr->mask &= ~PVFS_ATTR_META_DIST;
584        resp_attr->mask &= ~PVFS_ATTR_META_DFILES;
585        resp_attr->mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
586    }
587    return SM_ACTION_COMPLETE;
588}
589
590static PINT_sm_action getattr_read_metafile_hint(
591    struct PINT_smcb *smcb, job_status_s *js_p)
592{
593    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
594    int ret = -PVFS_EINVAL;
595    job_id_t i;
596    char *buf = NULL;
597
598    assert(s_op->attr.objtype == PVFS_TYPE_METAFILE);
599    buf = (char *) calloc(sizeof(PVFS_metafile_hint) + 1, 1);
600    if (buf == NULL)
601    {
602        js_p->error_code = -PVFS_ENOMEM;
603        /*If we hit an error the DIST & DFILES are no longer valid*/
604        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
605        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
606        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
607        return 1;
608    }
609
610    js_p->error_code = 0;
611
612    s_op->key.buffer = Trove_Special_Keys[METAFILE_HINT_KEY].key;
613    s_op->key.buffer_sz = Trove_Special_Keys[METAFILE_HINT_KEY].size;
614
615    if(s_op->free_val)
616    {
617        free(s_op->val.buffer);
618    }
619    s_op->val.buffer = buf;
620    s_op->val.buffer_sz = sizeof(s_op->resp.u.getattr.attr.u.meta.hint) + 1;
621    s_op->free_val = 1;
622
623    gossip_debug(GOSSIP_GETATTR_DEBUG,
624                 "  reading metafile hint (coll_id = %d, "
625                 "handle = %llu, key = %s (%d), val_buf = %p (%d))\n",
626                 s_op->u.getattr.fs_id,
627                 llu(s_op->u.getattr.handle), (char *)s_op->key.buffer,
628                 s_op->key.buffer_sz, s_op->val.buffer,
629                 s_op->val.buffer_sz);
630
631    ret = job_trove_keyval_read(
632        s_op->u.getattr.fs_id, s_op->u.getattr.handle,
633        &s_op->key, &s_op->val,
634        0,
635        NULL, smcb, 0, js_p,
636        &i, server_job_context, s_op->req->hints);
637
638    return ret;
639}
640
641static PINT_sm_action getattr_read_metafile_datafile_handles_if_required(
642        struct PINT_smcb *smcb, job_status_s *js_p)
643{
644    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
645    int ret = -PVFS_EINVAL;
646    int dfile_count = 0;
647    job_id_t i;
648
649    assert(s_op->attr.objtype == PVFS_TYPE_METAFILE);
650
651    js_p->error_code = 0;
652
653    /* if we don't need to fill in the dfiles, skip them */
654    if (!(s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES))
655    {
656        gossip_debug(GOSSIP_GETATTR_DEBUG, "skipping data handle read\n");
657        return SM_ACTION_COMPLETE;
658    }
659
660    dfile_count = s_op->resp.u.getattr.attr.u.meta.dfile_count;
661
662    gossip_debug(GOSSIP_GETATTR_DEBUG,
663                 " request has dfile_count of %d | dspace has %d\n",
664                 s_op->resp.u.getattr.attr.u.meta.dfile_count,
665                 s_op->resp.u.getattr.attr.u.meta.dfile_count);
666
667    /* verify that the retrieved dfile count is sane */
668    if (!PVFS_REQ_LIMIT_DFILE_COUNT_IS_VALID(dfile_count))
669    {
670        gossip_err("The requested dfile count of %d is invalid; "
671                   "aborting operation.\n", dfile_count);
672        gossip_err(
673            "+ attrs read from dspace: (owner = %d, group = %d, "
674            "perms = %o, type = %d\n   atime = %lld, mtime = %lld, "
675            "ctime = %lld |\n   dfile_count = %d | dist_size = %d)\n",
676            s_op->resp.u.getattr.attr.owner,
677            s_op->resp.u.getattr.attr.group,
678            s_op->resp.u.getattr.attr.perms,
679            s_op->resp.u.getattr.attr.objtype,
680            lld(s_op->resp.u.getattr.attr.atime),
681            lld(s_op->resp.u.getattr.attr.mtime),
682            lld(s_op->resp.u.getattr.attr.ctime),
683            (int)s_op->resp.u.getattr.attr.u.meta.dfile_count,
684            (int)s_op->resp.u.getattr.attr.u.meta.dist_size);
685
686        gossip_err("handle: %llu (%llx), fsid: %d.\n",
687            llu(s_op->u.getattr.handle), llu(s_op->u.getattr.handle),
688            (int)s_op->u.getattr.fs_id);
689
690        /*If we hit an error the DIST & DFILES are no longer valid*/
691        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
692        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
693        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
694       
695        js_p->error_code = -PVFS_EOVERFLOW;
696        return SM_ACTION_COMPLETE;
697    }
698
699    s_op->key.buffer = Trove_Common_Keys[METAFILE_HANDLES_KEY].key;
700    s_op->key.buffer_sz = Trove_Common_Keys[METAFILE_HANDLES_KEY].size;
701
702    /* add mask value to indicate the data file array is filled */
703    s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_META_DFILES;
704
705    s_op->resp.u.getattr.attr.u.meta.dfile_array =
706        malloc(dfile_count * sizeof(PVFS_handle));
707    if (!s_op->resp.u.getattr.attr.u.meta.dfile_array)
708    {
709        gossip_err("Cannot allocate dfile array of count %d\n",
710                   dfile_count);
711        js_p->error_code = -PVFS_ENOMEM;
712        return SM_ACTION_COMPLETE;
713    }
714
715    if(s_op->free_val)
716    {
717        free(s_op->val.buffer);
718    }
719    s_op->val.buffer = s_op->resp.u.getattr.attr.u.meta.dfile_array;
720    s_op->val.buffer_sz = (dfile_count * sizeof(PVFS_handle));
721    /* this will get cleaned up with attr structure */
722    s_op->free_val = 0;
723
724    gossip_debug(GOSSIP_GETATTR_DEBUG,
725                 "  reading %d datafile handles (coll_id = %d, "
726                 "handle = %llu, key = %s (%d), val_buf = %p (%d))\n",
727                 dfile_count, s_op->u.getattr.fs_id,
728                 llu(s_op->u.getattr.handle), (char *)s_op->key.buffer,
729                 s_op->key.buffer_sz, s_op->val.buffer,
730                 s_op->val.buffer_sz);
731
732    ret = job_trove_keyval_read(
733        s_op->u.getattr.fs_id
734       ,s_op->u.getattr.handle
735       ,&s_op->key
736       ,&s_op->val
737       ,0
738       ,NULL
739       ,smcb
740       ,0
741       ,js_p
742       ,&i
743       ,server_job_context
744       ,s_op->req->hints);
745
746    return ret;
747}
748
749
750static PINT_sm_action getattr_read_mirrored_copies_count_if_required(
751        struct PINT_smcb *smcb, job_status_s *js_p)
752{
753    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s...\n",__func__);
754
755    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
756    struct PVFS_server_resp *resp = &(s_op->resp);
757    PVFS_metafile_attr *meta =  &(resp->u.getattr.attr.u.meta);
758    int ret = -PVFS_EINVAL;
759    job_id_t job_id;
760
761   /* Are we mirroring? */
762    if (!(resp->u.getattr.attr.mask & PVFS_ATTR_META_MIRROR_DFILES))
763    {
764        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tMirroring is NOT turned on "
765                                         "for this handle(%llu)..\n"
766                                        ,llu(s_op->u.getattr.handle));
767        js_p->error_code = SKIP_NEXT_STATE;
768
769        return SM_ACTION_COMPLETE;
770    }
771
772    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tMirroring IS turned on for this "
773                                     "handle(%llu)...\n"
774                                    ,llu(s_op->u.getattr.handle));
775
776    js_p->error_code = 0;
777
778    /* setup job to read user.pvfs2.mirror.copies */
779
780    /* initialize */
781    if (s_op->free_val)
782       free(s_op->val.buffer);
783    memset(&(s_op->val),0,sizeof(s_op->val));
784    memset(&(s_op->key),0,sizeof(s_op->key));
785
786    /* set key = user.pvfs2.mirror.copies */
787    s_op->key.buffer    = Trove_Special_Keys[MIRROR_COPIES_KEY].key;
788    s_op->key.buffer_sz = Trove_Special_Keys[MIRROR_COPIES_KEY].size;
789
790    /* setup space for retrieved value */
791    meta->mirror_copies_count = 0;
792    meta->mirror_dfile_array  = NULL;
793    s_op->val.buffer = &(meta->mirror_copies_count);
794    s_op->val.buffer_sz = sizeof(meta->mirror_copies_count);
795    s_op->free_val = 0;
796
797    /* submit job to read the value */
798    ret = job_trove_keyval_read(
799        s_op->u.getattr.fs_id
800       ,s_op->u.getattr.handle
801       ,&s_op->key
802       ,&s_op->val
803       ,0
804       ,NULL
805       ,smcb
806       ,0
807       ,js_p
808       ,&job_id
809       ,server_job_context
810       ,s_op->req->hints);
811
812    return ret;
813}/*end getattr_read_mirrored_copies_if_required*/
814
815
816
817static PINT_sm_action getattr_read_mirrored_handles_if_required(
818        struct PINT_smcb *smcb, job_status_s *js_p)
819{
820   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s ...\n",__func__);
821
822    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
823    struct PVFS_server_resp *resp = &(s_op->resp);
824    PVFS_metafile_attr *meta =  &(resp->u.getattr.attr.u.meta);
825    int ret = -PVFS_EINVAL;
826    job_id_t job_id;
827    int i;
828
829   /* Did we find mirror.copies? */
830    if (js_p->error_code < 0)
831    {
832        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies cannot "
833                                         "be retrieved.\n");
834        gossip_lerr("Mirror handles requested, but number of mirrored copies "
835                    "cannot be retrieved.\n");
836        if (resp->u.getattr.attr.mask & (PVFS_ATTR_META_DFILES |
837                                         PVFS_ATTR_META_DIST) )
838        {
839            resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
840            js_p->error_code = SKIP_NEXT_STATE;
841        } else {
842            resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
843            resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
844            resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
845        }
846        memset(&(s_op->key),0,sizeof(s_op->key));
847        memset(&(s_op->val),0,sizeof(s_op->val));
848        s_op->free_val = 0;
849
850        return SM_ACTION_COMPLETE;
851    }
852
853    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies count "
854                                     "successfully retrieved.\n");
855
856   /* check number of mirrored copies */
857    if (meta->mirror_copies_count == 0)
858    {
859       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies "
860                                        "is ZERO.\n");
861       gossip_lerr("Mirror handles requested, but number of mirrored copies "
862                   "is zero.\n");
863       resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
864       js_p->error_code = SKIP_NEXT_STATE;
865       return SM_ACTION_COMPLETE;
866    }
867
868    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies "
869                                     "retrieved : %d\n"
870                                    ,meta->mirror_copies_count);
871
872
873   /* check to see if total number of mirrored handles is sane */
874   if ( (meta->dfile_count * meta->mirror_copies_count) >
875         PVFS_REQ_LIMIT_MIRROR_DFILE_COUNT )
876   {
877       gossip_lerr("Number of mirrored handles(%d) exceeds the system "
878                   "limit(%d)\n"
879                  ,meta->dfile_count * meta->mirror_copies_count
880                  ,PVFS_REQ_LIMIT_MIRROR_DFILE_COUNT);
881       resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
882       js_p->error_code = SKIP_NEXT_STATE;
883       return SM_ACTION_COMPLETE;
884   }
885
886   js_p->error_code = 0;
887   /* get mirrored handles and status of each handle */
888
889   /* initialize */
890   if (s_op->free_val)
891      free(s_op->val.buffer);
892   memset(&(s_op->key),0,sizeof(s_op->key));
893   memset(&(s_op->val),0,sizeof(s_op->val));
894
895   for (i=0; i<s_op->keyval_count; i++)
896       if (s_op->val_a && s_op->val_a[i].buffer && s_op->free_val)
897           free(s_op->val_a[i].buffer);
898   if (s_op->val_a)
899       free(s_op->val_a);
900   if (s_op->key_a)
901       free(s_op->key_a);
902   if (s_op->error_a)
903      free(s_op->error_a);
904   s_op->free_val = 0;
905
906   /* allocate space for keys and values */
907   s_op->keyval_count = 2;
908   s_op->free_val = 1;
909   s_op->key_a = s_op->val_a = NULL;
910   s_op->error_a = NULL;
911   
912   s_op->key_a   = malloc(sizeof(*s_op->key_a)   * s_op->keyval_count);
913   s_op->val_a   = malloc(sizeof(*s_op->val_a)   * s_op->keyval_count);
914   s_op->error_a = malloc(sizeof(*s_op->error_a) * s_op->keyval_count);
915   if (!s_op->key_a || !s_op->val_a || !s_op->error_a)
916   {
917      gossip_lerr("Cannot allocate memory for key/val/error.\n");
918      js_p->error_code = -PVFS_ENOMEM;
919      goto error_exit;
920   }
921   memset(s_op->key_a,0,sizeof(*s_op->key_a));
922   memset(s_op->val_a,0,sizeof(*s_op->val_a));
923
924   /* set key = user.pvfs2.mirror.handles */
925   s_op->key_a[0].buffer    = Trove_Special_Keys[MIRROR_HANDLES_KEY].key;
926   s_op->key_a[0].buffer_sz = Trove_Special_Keys[MIRROR_HANDLES_KEY].size;
927
928   /* setup buffer space for handles */
929   s_op->val_a[0].buffer = malloc(sizeof(PVFS_handle) *
930                                  meta->dfile_count   *
931                                  meta->mirror_copies_count);
932   if (!s_op->val_a[0].buffer)
933   {
934      gossip_lerr("Cannot allocate memory for mirrored handles.\n");
935      js_p->error_code = -PVFS_ENOMEM;
936      goto error_exit;
937   }
938   memset(s_op->val_a[0].buffer,0,sizeof(PVFS_handle) *
939                                  meta->dfile_count   *
940                                  meta->mirror_copies_count);
941   s_op->val_a[0].buffer_sz = sizeof(PVFS_handle) *
942                              meta->dfile_count   *
943                              meta->mirror_copies_count;
944
945   /* set key = user.pvfs2.mirror.status */
946   s_op->key_a[1].buffer    = Trove_Special_Keys[MIRROR_STATUS_KEY].key;
947   s_op->key_a[1].buffer_sz = Trove_Special_Keys[MIRROR_STATUS_KEY].size;
948
949   /* setup buffer space for handle statuses */
950   s_op->val_a[1].buffer = malloc(sizeof(PVFS_handle) *
951                                  meta->dfile_count   *
952                                  meta->mirror_copies_count);
953   if (!s_op->val_a[1].buffer)
954   {
955       gossip_lerr("Cannot allocate memory for mirrored handle statuses.\n");
956       js_p->error_code = -PVFS_ENOMEM;
957       goto error_exit;
958   }
959   memset(s_op->val_a[1].buffer,0,sizeof(PVFS_handle) *
960                                  meta->dfile_count   *
961                                  meta->mirror_copies_count);
962   s_op->val_a[1].buffer_sz = sizeof(PVFS_handle) *
963                              meta->dfile_count   *
964                              meta->mirror_copies_count;
965
966   
967   /* call job to retrieve the key/val pairs */
968   ret = job_trove_keyval_read_list(
969          s_op->u.getattr.fs_id
970         ,s_op->u.getattr.handle
971         ,s_op->key_a
972         ,s_op->val_a
973         ,s_op->error_a
974         ,s_op->keyval_count
975         ,0
976         ,NULL
977         ,smcb
978         ,0
979         ,js_p
980         ,&job_id
981         ,server_job_context
982         ,s_op->req->hints );
983
984   return ret;
985
986error_exit:
987   for (i=0; i<s_op->keyval_count; i++)
988   {
989       if (s_op->val_a && s_op->val_a[i].buffer)
990          free(s_op->val_a[i].buffer);
991   }
992   if (s_op->val_a)
993       free(s_op->val_a);
994   if (s_op->key_a)
995       free(s_op->key_a);
996   if (s_op->error_a)
997       free(s_op->error_a);
998   s_op->val_a = s_op->key_a = NULL;
999   s_op->error_a = NULL;
1000   s_op->keyval_count = 0;
1001   s_op->free_val = 0;
1002
1003   resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
1004
1005   return SM_ACTION_COMPLETE;
1006}/*end getattr_read_mirrored_handles_if_required*/
1007
1008
1009
1010
1011static PINT_sm_action getattr_mirrored_handles_safety_check(
1012        struct PINT_smcb *smcb, job_status_s *js_p)
1013{
1014   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s ...\n",__func__);
1015
1016    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1017    struct PVFS_server_resp *resp = &(s_op->resp);
1018    PVFS_metafile_attr *meta =  &(resp->u.getattr.attr.u.meta);
1019    int row,col,index;
1020    int i;
1021
1022
1023   js_p->error_code = 0;
1024
1025   /* Check the error code for each key/val pair. */
1026   for (i=0; i<s_op->keyval_count; i++)
1027   {
1028       if (s_op->error_a[i] != 0)
1029       {
1030           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRetrieval of key(%s) failed.\n"
1031                                           ,(char *)s_op->key_a[i].buffer);
1032           js_p->error_code = s_op->error_a[i];
1033       }
1034   }
1035
1036   if (js_p->error_code)
1037   {
1038      goto error_exit;
1039   }
1040
1041   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tWe successfully retrieved handles and "
1042                                    "statuses.\n");
1043
1044   for (i=0; i<s_op->keyval_count; i++)
1045   {  /* Did we get the data that we were expecting from
1046       * user.pvfs2.mirror.handles(i=0) or
1047       * user.pvfs2.mirror.statuses(i=1)?
1048       */
1049
1050     if (s_op->val_a[i].read_sz != s_op->val_a[i].buffer_sz)
1051     {
1052         gossip_lerr("Error: %s key found val size: %d when "
1053                    "expecting val size: %d\n"
1054                    ,(char *)s_op->key_a[i].buffer
1055                    ,s_op->val_a[i].read_sz
1056                    ,s_op->val_a[i].buffer_sz);
1057         js_p->error_code = s_op->val_a[i].buffer_sz;
1058     }
1059   }/*end for*/
1060
1061   if (js_p->error_code)
1062   {
1063       goto error_exit;
1064   }
1065
1066   /*initialize permanent data structures*/
1067   meta->mirror_dfile_array = s_op->val_a[0].buffer;
1068   s_op->u.getattr.mirror_dfile_status_array = s_op->val_a[1].buffer;
1069   s_op->val_a[0].buffer = s_op->val_a[1].buffer = NULL;
1070
1071   /* Check the mirroring status for each handle.  If the status is non-zero,
1072    * the handle is not valid, so put a null in the mirrory array for that
1073    * handle.  Otherwise, do nothing.
1074   */
1075   for (row=0; row<meta->mirror_copies_count; row++)
1076   {
1077      for (col=0; col<meta->dfile_count; col++)
1078      {
1079         index = (row*meta->dfile_count) + col;
1080         if ( s_op->u.getattr.mirror_dfile_status_array[index] == UINT64_HIGH )
1081            meta->mirror_dfile_array[index] = 0;
1082         gossip_debug(GOSSIP_MIRROR_DEBUG,
1083                      "\tmirror handle[%d]:%llu \t"
1084                      "status:%llu\n"
1085                      ,index
1086                      ,llu(meta->mirror_dfile_array[index])
1087                      ,llu(s_op->u.getattr.mirror_dfile_status_array[index]));
1088      }
1089   }
1090
1091   /*Cleanup*/
1092   free(s_op->key_a);
1093   free(s_op->val_a);
1094   free(s_op->error_a);
1095   s_op->key_a = s_op->val_a = NULL;
1096   s_op->error_a = NULL;
1097   s_op->keyval_count = 0;
1098   s_op->free_val = 0;   
1099
1100   js_p->error_code = 0;
1101   return SM_ACTION_COMPLETE;
1102
1103error_exit:
1104   /* if we have an error, cleanup, and pretend that we never attempted
1105    * mirrors in the first place.
1106   */
1107      gossip_debug(GOSSIP_MIRROR_DEBUG,"\tCleaning up mirror operation...\n");
1108      js_p->error_code = 0;
1109      for (i=0; i<s_op->keyval_count; i++)
1110      {
1111          if (s_op->val_a[i].buffer)
1112              free(s_op->val_a[i].buffer);
1113      }
1114      free(s_op->key_a);
1115      free(s_op->val_a);
1116      free(s_op->error_a);
1117      s_op->key_a = s_op->val_a = NULL;
1118      s_op->error_a = NULL;
1119      s_op->keyval_count = 0;
1120     
1121      /*We MUST set the number of copies to zero to prevent encoding errors
1122       *later.
1123      */
1124      meta->mirror_copies_count = 0;
1125      meta->mirror_dfile_array = NULL;
1126      s_op->u.getattr.mirror_dfile_status_array = NULL;
1127      resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
1128      return SM_ACTION_COMPLETE;
1129}/*end getattr_mirrored_handles_safety_check*/
1130
1131
1132
1133
1134static PINT_sm_action getattr_read_metafile_distribution_if_required(
1135        struct PINT_smcb *smcb, job_status_s *js_p)
1136{
1137    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1138    int ret = -PVFS_EINVAL;
1139    job_id_t i;
1140
1141    assert(s_op->attr.objtype == PVFS_TYPE_METAFILE);
1142
1143    js_p->error_code = 0;
1144
1145    /* if we don't need to fill in the distribution, skip it */
1146    if (!(s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST))
1147    {
1148        gossip_debug(GOSSIP_GETATTR_DEBUG, "skipping data handle "
1149                     "distribution read\n");
1150        return SM_ACTION_COMPLETE;
1151    }
1152
1153    s_op->key.buffer = Trove_Common_Keys[METAFILE_DIST_KEY].key;
1154    s_op->key.buffer_sz = Trove_Common_Keys[METAFILE_DIST_KEY].size;
1155
1156    /*
1157      there *should* be some distribution information.  if not, dump
1158      which handle is busted and assertion die for now while we're not
1159      handling this kind of error
1160    */
1161    if (s_op->resp.u.getattr.attr.u.meta.dist_size < 1)
1162    {
1163        gossip_err("Cannot Read Dist!  Got an invalid dist size for "
1164                   "handle %llu,%d\n",llu(s_op->u.getattr.handle),
1165                   s_op->u.getattr.fs_id);
1166        js_p->error_code = -PVFS_EINVAL;
1167        return SM_ACTION_COMPLETE;
1168    }
1169    assert(s_op->resp.u.getattr.attr.u.meta.dist_size > 0);
1170
1171    /* add mask value to indicate the distribution is filled */
1172    s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_META_DIST;
1173
1174    if(s_op->free_val)
1175    {
1176        free(s_op->val.buffer);
1177    }
1178    s_op->val.buffer_sz = s_op->resp.u.getattr.attr.u.meta.dist_size;
1179    s_op->val.buffer = malloc(s_op->val.buffer_sz);
1180    if (!s_op->val.buffer)
1181    {
1182        gossip_err("Cannot allocate dist of size %d\n",
1183                   s_op->val.buffer_sz);
1184        js_p->error_code = -PVFS_ENOMEM;
1185        return SM_ACTION_COMPLETE;
1186    }
1187    s_op->free_val = 1;
1188
1189    ret = job_trove_keyval_read(
1190        s_op->u.getattr.fs_id, s_op->u.getattr.handle,
1191        &(s_op->key), &(s_op->val),
1192        0,
1193        NULL,
1194        smcb, 0, js_p, &i, server_job_context, s_op->req->hints);
1195
1196    return ret;
1197}
1198
1199static PINT_sm_action getattr_read_stuffed_size(
1200    struct PINT_smcb *smcb, job_status_s *js_p)
1201{
1202    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1203    job_id_t job_id;
1204
1205    if(js_p->error_code == -TROVE_ENOENT)
1206    {
1207        gossip_debug(
1208            GOSSIP_GETATTR_DEBUG, "Getattr detected non-stuffed file.\n");
1209        /* this means that the keyval fields used to indicate a file is
1210         * stuffed are not present.  Set mask accordingly and continue.
1211         */
1212        s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_META_UNSTUFFED;
1213        js_p->error_code = 0;
1214        return SM_ACTION_COMPLETE;
1215    }
1216    if(js_p->error_code)
1217    {
1218        /* any other error code here is just a normal error case */
1219        /* preserve error code and catch next error transition */
1220        return SM_ACTION_COMPLETE;
1221    }
1222
1223    gossip_debug(
1224        GOSSIP_GETATTR_DEBUG, "Getattr detected stuffed file.\n");
1225    /* otherwise, we found keyval fields indicating that the file is
1226     * stuffed.  It does not matter if the client asked for the size or not;
1227     * we must retrieve a valid stuffed_size value for the attrs.
1228     */
1229    s_op->resp.u.getattr.attr.mask &= (~(PVFS_ATTR_META_UNSTUFFED));
1230
1231    return(job_trove_dspace_getattr(
1232        s_op->u.getattr.fs_id,
1233        s_op->resp.u.getattr.attr.u.meta.dfile_array[0],
1234        smcb,
1235        &s_op->ds_attr,
1236        0,
1237        js_p,
1238        &job_id,
1239        server_job_context,
1240        s_op->req->hints));
1241}
1242
1243static PINT_sm_action getattr_interpret_stuffed_size(
1244    struct PINT_smcb *smcb, job_status_s *js_p)
1245{
1246    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1247    PVFS_metafile_attr *meta    = &(s_op->resp.u.getattr.attr.u.meta);
1248
1249    if(js_p->error_code == 0)
1250    {
1251        meta->stuffed_size = s_op->ds_attr.u.datafile.b_size;
1252    }
1253
1254    /* deliberately leave error_code unchanged so that any errors get
1255     * handled in the next state
1256     */
1257    return SM_ACTION_COMPLETE;
1258}
1259
1260
1261/* interpret_metafile_distribution()
1262 *
1263 * capture and encode results of reading distribution
1264 */
1265static PINT_sm_action interpret_metafile_distribution(
1266        struct PINT_smcb *smcb, job_status_s *js_p)
1267{
1268    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1269    PVFS_object_attr *resp_attr = &s_op->resp.u.getattr.attr;
1270   
1271    if(js_p->error_code < 0)
1272    {
1273        return SM_ACTION_COMPLETE;
1274    }
1275
1276    if(s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST)
1277    {
1278        /* successfully read dist key; make sure we got something valid */
1279        if(s_op->val.read_sz != s_op->val.buffer_sz)
1280        {
1281            gossip_err("Error: %s key found val size: %d when "
1282                       "expecting val size: %d\n",
1283                Trove_Common_Keys[METAFILE_DIST_KEY].key,
1284                s_op->val.read_sz,
1285                s_op->val.buffer_sz);
1286
1287            /* clear bitmask to prevent double free between setup_resp and
1288             * PINT_free_object_attr()
1289             */
1290            s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
1291
1292            js_p->error_code = -PVFS_EIO;
1293            return SM_ACTION_COMPLETE;
1294        }
1295
1296        assert(s_op->val.buffer);
1297        PINT_dist_decode(&resp_attr->u.meta.dist, s_op->val.buffer);
1298
1299        if(resp_attr->u.meta.dist == 0) {
1300            gossip_err("Found dist of 0 for handle %llu,%d\n",
1301                    llu(s_op->u.getattr.handle), s_op->u.getattr.fs_id);
1302            PVFS_perror("Metafile getattr_setup_resp",js_p->error_code);
1303            js_p->error_code = -PVFS_EIO;
1304            return SM_ACTION_COMPLETE;
1305        }
1306    }
1307
1308    js_p->error_code = 0;
1309    return SM_ACTION_COMPLETE;
1310}
1311
1312static PINT_sm_action getattr_setup_resp(
1313        struct PINT_smcb *smcb, job_status_s *js_p)
1314{
1315    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1316    PVFS_object_attr *resp_attr = &s_op->resp.u.getattr.attr;
1317
1318    gossip_debug(GOSSIP_GETATTR_DEBUG,
1319                 "  getattr_setup_resp: error_code = %d\n",
1320                 js_p->error_code);
1321    if(js_p->error_code > 0)
1322    {
1323        /* if we reach this state with a positive error code it means that
1324         * nothing is wrong; we just used one of the explicit STATE_*
1325         * transitions
1326         */
1327        js_p->error_code = 0;
1328    }
1329    if(js_p->error_code < 0)
1330    {
1331        free_nested_getattr_data(s_op);
1332        return SM_ACTION_COMPLETE;
1333    }
1334
1335    gossip_debug(
1336        GOSSIP_GETATTR_DEBUG,
1337        "-  retrieved attrs: [owner = %d, group = %d\n\t"
1338        "perms = %o, type = %d, atime = %llu, mtime = %llu\n\t"
1339        "ctime = %llu, dist_size = %d]\n",
1340        resp_attr->owner, resp_attr->group, resp_attr->perms,
1341        resp_attr->objtype, llu(resp_attr->atime),
1342        llu(resp_attr->mtime), llu(resp_attr->ctime),
1343        (int)resp_attr->u.meta.dist_size);
1344
1345    if (resp_attr->objtype == PVFS_TYPE_METAFILE)
1346    {
1347        if (resp_attr->mask & PVFS_ATTR_META_DFILES)
1348        {
1349            if (resp_attr->u.meta.dfile_count)
1350            {
1351                assert(resp_attr->u.meta.dfile_array);
1352            }
1353            gossip_debug(GOSSIP_GETATTR_DEBUG,
1354                         "  also returning %d datafile handles\n",
1355                         resp_attr->u.meta.dfile_count);
1356        }
1357        if (resp_attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
1358        {
1359           if (resp_attr->u.meta.mirror_copies_count)
1360              assert(resp_attr->u.meta.mirror_dfile_array);
1361           gossip_debug(GOSSIP_GETATTR_DEBUG,
1362                        "  also returning %d mirrored copies\n"
1363                       ,resp_attr->u.meta.mirror_copies_count);
1364        }
1365        if (resp_attr->mask & PVFS_ATTR_META_DIST)
1366        {
1367            /* we have already gathered the dist field in an earlier state */
1368            gossip_debug(GOSSIP_GETATTR_DEBUG,
1369                         "  also returning dist size of %d\n",
1370                         resp_attr->u.meta.dist_size);
1371        }
1372    }
1373    else if ((resp_attr->objtype == PVFS_TYPE_DATAFILE) &&
1374             (resp_attr->mask & PVFS_ATTR_DATA_SIZE))
1375    {
1376        gossip_debug(GOSSIP_GETATTR_DEBUG,
1377                     "  also returning data size of %lld\n",
1378                     lld(resp_attr->u.data.size));
1379    }
1380    else if ((resp_attr->objtype == PVFS_TYPE_SYMLINK) &&
1381             (resp_attr->mask & PVFS_ATTR_SYMLNK_TARGET))
1382    {
1383        if (js_p->error_code == 0)
1384        {
1385            assert(resp_attr->u.sym.target_path);
1386            assert(resp_attr->u.sym.target_path_len);
1387            /*
1388              adjust target path len down to actual size ; always
1389              include the null termination char in the target_path_len
1390            */
1391            resp_attr->u.sym.target_path_len =
1392                (strlen(resp_attr->u.sym.target_path) + 1);
1393
1394            gossip_debug(GOSSIP_GETATTR_DEBUG,
1395                         "  also returning link target of %s (len %d)\n",
1396                         resp_attr->u.sym.target_path,
1397                         resp_attr->u.sym.target_path_len);
1398        }
1399        else
1400        {
1401            gossip_err("Failed to retrieve symlink target path for "
1402                       "handle %llu,%d\n",llu(s_op->u.getattr.handle),
1403                       s_op->u.getattr.fs_id);
1404            PVFS_perror("Symlink retrieval failure",js_p->error_code);
1405
1406            free_nested_getattr_data(s_op);
1407            js_p->error_code = -PVFS_EINVAL;
1408            return SM_ACTION_COMPLETE;
1409        }
1410    }
1411    else if ((resp_attr->objtype == PVFS_TYPE_DIRECTORY) &&
1412            (resp_attr->mask & PVFS_ATTR_DIR_HINT))
1413    {
1414        uint32_t dirent_file_count_i;
1415
1416        gossip_debug(GOSSIP_GETATTR_DEBUG, " server returning "
1417            "dirent_count = %llu "
1418            "dirent_file_count = %d "
1419            "dfile_count = %d "
1420            "dist_name_len    = %d "
1421            "dist_params_len  = %d\n",
1422            llu(resp_attr->u.dir.dirent_count),
1423            resp_attr->u.dir.dirent_file_count,
1424            resp_attr->u.dir.hint.dfile_count,
1425            resp_attr->u.dir.hint.dist_name_len,
1426            resp_attr->u.dir.hint.dist_params_len);
1427        for (dirent_file_count_i = 0;
1428             dirent_file_count_i < resp_attr->u.dir.dirent_file_count;
1429             dirent_file_count_i++) {
1430            gossip_debug(GOSSIP_GETATTR_DEBUG,
1431                "    dirent_handle[%d] = %lld\n",
1432                dirent_file_count_i,
1433                lld(resp_attr->u.dir.dirent_handle[dirent_file_count_i]));
1434        }
1435 
1436    }
1437
1438    gossip_debug(GOSSIP_GETATTR_DEBUG,"@ End %s attributes: sending "
1439                 "status %d (error = %d)\n",
1440                 PINT_util_get_object_type(resp_attr->objtype),
1441                 s_op->resp.status, js_p->error_code);
1442
1443#if 0
1444    gossip_debug(GOSSIP_GETATTR_DEBUG, "returning attrmask ");
1445    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,
1446                        s_op->resp.u.getattr.attr.mask);
1447#endif
1448
1449    free_nested_getattr_data(s_op);
1450    return SM_ACTION_COMPLETE;
1451}
1452
1453static void free_nested_getattr_data(struct PINT_server_op *s_op)
1454{
1455   int i;
1456    /* free up anything that was set up specifically by this nested machine */
1457    if (s_op->free_val)
1458    {
1459       for (i=0; i<s_op->keyval_count; i++)
1460       {
1461           if (s_op->val_a[i].buffer)
1462               free(s_op->val_a[i].buffer);
1463       }
1464    }
1465    if(s_op->val_a)
1466    {
1467        free(s_op->val_a);
1468        s_op->val_a = NULL;
1469    }
1470    if(s_op->key_a)
1471    {
1472        free(s_op->key_a);
1473        s_op->key_a = NULL;
1474    }
1475    if(s_op->u.getattr.err_array)
1476    {
1477        free(s_op->u.getattr.err_array);
1478        s_op->u.getattr.err_array = NULL;
1479    }
1480    if (s_op->u.getattr.mirror_dfile_status_array)
1481    {
1482        free(s_op->u.getattr.mirror_dfile_status_array);
1483        s_op->u.getattr.mirror_dfile_status_array = NULL;
1484    }
1485    if (s_op->resp.u.getattr.attr.objtype == PVFS_TYPE_DIRECTORY &&
1486        s_op->u.getattr.dirent_handle)
1487    {
1488        free(s_op->u.getattr.dirent_handle);
1489        s_op->u.getattr.dirent_handle = NULL;
1490    }
1491    if(s_op->free_val)
1492    {
1493        free(s_op->val.buffer);
1494        s_op->val.buffer = NULL;
1495    }
1496    if(s_op->error_a)
1497    {
1498        free(s_op->error_a);
1499        s_op->error_a = NULL;
1500    }
1501
1502    return;
1503}
1504
1505static PINT_sm_action getattr_cleanup(
1506        struct PINT_smcb *smcb, job_status_s *js_p)
1507{
1508    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1509
1510    PINT_free_object_attr(&s_op->resp.u.getattr.attr);
1511    return(server_state_machine_complete(smcb));
1512}
1513
1514static PINT_sm_action getattr_with_prelude_init(
1515        struct PINT_smcb *smcb, job_status_s *js_p)
1516{
1517    js_p->error_code = 0;
1518    return SM_ACTION_COMPLETE;
1519}
1520
1521static PINT_sm_action getattr_setup_op(
1522        struct PINT_smcb *smcb, job_status_s *js_p)
1523{
1524    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1525    s_op->u.getattr.handle = s_op->req->u.getattr.handle;
1526    s_op->u.getattr.fs_id = s_op->req->u.getattr.fs_id;
1527    s_op->u.getattr.attrmask = s_op->req->u.getattr.attrmask;
1528
1529    js_p->error_code = 0;
1530    return SM_ACTION_COMPLETE;
1531}
1532
1533static PINT_sm_action getattr_datafile_handles_safety_check(
1534        struct PINT_smcb *smcb, job_status_s *js_p)
1535{
1536    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1537
1538    if((js_p->error_code == 0) &&
1539        (s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES))
1540    {
1541        /* successfully read datafile key; make sure we got something valid */
1542        if(s_op->val.read_sz != s_op->val.buffer_sz)
1543        {
1544            gossip_err("Error: %s key found val size: %d when "
1545                       "expecting val size: %d\n",
1546                Trove_Common_Keys[METAFILE_HANDLES_KEY].key,
1547                s_op->val.read_sz,
1548                s_op->val.buffer_sz);
1549
1550            /* clear bitmask to prevent double free between setup_resp and
1551             * PINT_free_object_attr()
1552             */
1553            s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
1554            s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
1555            s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
1556
1557            js_p->error_code = -PVFS_EIO;
1558            return SM_ACTION_COMPLETE;
1559        }
1560    }
1561
1562    /* otherwise deliberately preserve existing error code */
1563    return SM_ACTION_COMPLETE;
1564}
1565
1566static PINT_sm_action getattr_get_num_dirdata_handles(
1567        struct PINT_smcb *smcb, job_status_s *js_p)
1568{
1569    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1570    int ret;
1571    job_id_t tmp_id;
1572
1573    s_op->key.buffer = Trove_Common_Keys[NUM_DIR_ENT_KEYS].key;
1574    s_op->key.buffer_sz = Trove_Common_Keys[NUM_DIR_ENT_KEYS].size;
1575    if(s_op->free_val)
1576    {
1577        free(s_op->val.buffer);
1578    }
1579    s_op->u.getattr.num_dirent_handles = 0;
1580    s_op->val.buffer = &s_op->u.getattr.num_dirent_handles;
1581    s_op->val.buffer_sz = sizeof(int32_t);
1582    s_op->free_val = 0;
1583
1584    js_p->error_code = 0;
1585    ret = job_trove_keyval_read(
1586        s_op->u.getattr.fs_id, s_op->u.getattr.handle,
1587        &s_op->key, &s_op->val,
1588        0,
1589        NULL,
1590        smcb,
1591        0,
1592        js_p,
1593        &tmp_id,
1594        server_job_context, s_op->req->hints);
1595
1596    return ret;
1597}
1598
1599static PINT_sm_action getattr_get_dirdata_handles(
1600        struct PINT_smcb *smcb, job_status_s *js_p)
1601{
1602    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1603    int ret;
1604    job_id_t tmp_id;
1605
1606    gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr: num_dirent_handles: %d\n",
1607        s_op->u.getattr.num_dirent_handles);
1608    if(js_p->error_code == -TROVE_ENOENT)
1609    {
1610        gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr: setting num_dirent_handles to 1\n");
1611        s_op->u.getattr.num_dirent_handles = 1;
1612    }
1613    s_op->key.buffer = Trove_Common_Keys[DIR_ENT_KEY].key;
1614    s_op->key.buffer_sz = Trove_Common_Keys[DIR_ENT_KEY].size;
1615    if(s_op->free_val)
1616    {
1617        free(s_op->val.buffer);
1618    }
1619    s_op->u.getattr.dirent_handle =
1620            (PVFS_handle *) calloc(s_op->u.getattr.num_dirent_handles,
1621                                   sizeof(PVFS_handle));
1622    if (! s_op->u.getattr.dirent_handle) {
1623        js_p->error_code = -PVFS_ENOMEM;
1624        return SM_ACTION_COMPLETE;
1625    }
1626    s_op->val.buffer = s_op->u.getattr.dirent_handle;
1627    s_op->val.buffer_sz = s_op->u.getattr.num_dirent_handles * sizeof(PVFS_handle);
1628    s_op->free_val = 0;
1629
1630    js_p->error_code = 0;
1631    ret = job_trove_keyval_read(
1632        s_op->u.getattr.fs_id, s_op->u.getattr.handle,
1633        &s_op->key, &s_op->val,
1634        0,
1635        NULL,
1636        smcb,
1637        0,
1638        js_p,
1639        &tmp_id,
1640        server_job_context, s_op->req->hints);
1641
1642    return ret;
1643}
1644       
1645static PINT_sm_action getattr_get_dirent_count(
1646        struct PINT_smcb *smcb, job_status_s *js_p)
1647{
1648    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1649    int ret;
1650    job_id_t tmp_id;
1651    int i;
1652
1653    if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_DIRENT_FILES)
1654    {
1655        /* Implementation of distributed directories will mean there
1656           may be multiple. */
1657        s_op->resp.u.getattr.attr.u.dir.dirent_file_count = s_op->u.getattr.num_dirent_handles;
1658        s_op->resp.u.getattr.attr.u.dir.dirent_handle =
1659            (PVFS_handle *) calloc(s_op->resp.u.getattr.attr.u.dir.dirent_file_count,
1660                                   sizeof(PVFS_handle));
1661        if (! s_op->resp.u.getattr.attr.u.dir.dirent_handle) {
1662            js_p->error_code = -PVFS_ENOMEM;
1663            return SM_ACTION_COMPLETE;
1664        }
1665        memcpy(s_op->resp.u.getattr.attr.u.dir.dirent_handle,
1666          s_op->u.getattr.dirent_handle,
1667          s_op->resp.u.getattr.attr.u.dir.dirent_file_count * sizeof(PVFS_handle));
1668        gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr: num_dirent_handles: %d\n",
1669            s_op->resp.u.getattr.attr.u.dir.dirent_file_count);
1670        for (i = 0; i < s_op->resp.u.getattr.attr.u.dir.dirent_file_count; i++)
1671        {
1672            gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr: dirent_handle[%d]: %llu\n",
1673                i, llu(s_op->resp.u.getattr.attr.u.dir.dirent_handle[i]));
1674        }
1675    }
1676    else
1677    {
1678        s_op->resp.u.getattr.attr.u.dir.dirent_file_count = 0;
1679    }
1680
1681    if (!(s_op->u.getattr.attrmask & PVFS_ATTR_DIR_DIRENT_COUNT))
1682    {
1683         /* the caller didn't really want the dirent count; skip to get
1684          * directory hints
1685          */
1686         js_p->error_code = STATE_DIR_HINT;
1687         return SM_ACTION_COMPLETE;
1688    }
1689    /* TODO: Have to gather counts from all dirent handles. */
1690    ret = job_trove_keyval_get_handle_info(
1691        s_op->u.getattr.fs_id,
1692        s_op->u.getattr.dirent_handle[0],
1693        TROVE_KEYVAL_HANDLE_COUNT |
1694        0,
1695        &s_op->u.getattr.keyval_handle_info,
1696        smcb,
1697        0,
1698        js_p,
1699        &tmp_id,
1700        server_job_context, s_op->req->hints);
1701
1702    return ret;
1703}
1704
1705static PINT_sm_action getattr_interpret_dirent_count(
1706        struct PINT_smcb *smcb, job_status_s *js_p)
1707{
1708    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1709    switch(js_p->error_code)
1710    {
1711        case -TROVE_ENOENT:
1712            js_p->error_code = 0;
1713            s_op->resp.u.getattr.attr.u.dir.dirent_count = 0;
1714            break;
1715        case 0:
1716            s_op->resp.u.getattr.attr.u.dir.dirent_count =
1717                s_op->u.getattr.keyval_handle_info.count;
1718            break;
1719        default:
1720            return SM_ACTION_COMPLETE;
1721    }
1722
1723    gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr: dirent_count: %lld\n",
1724        lld(s_op->resp.u.getattr.attr.u.dir.dirent_count));
1725
1726    return SM_ACTION_COMPLETE;
1727}
1728
1729static PINT_sm_action getattr_get_dir_hint(
1730        struct PINT_smcb *smcb, job_status_s *js_p)
1731{
1732    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1733    int ret, i;
1734    job_id_t tmp_id;
1735
1736    /* NOTE: memory allocations are released in the getattr_cleanup()
1737     * function
1738     */
1739   
1740    if (!(s_op->u.getattr.attrmask & PVFS_ATTR_DIR_HINT))
1741    {
1742        /* the caller didn't really want the dir hints; skip
1743         */
1744        js_p->error_code = STATE_DONE;
1745        return SM_ACTION_COMPLETE;
1746    }
1747
1748
1749    gossip_debug(GOSSIP_SERVER_DEBUG, "  trying to getxattr of %s,%s,%s "
1750                 "of dir handle (coll_id = %d, handle = %llu\n",
1751                 Trove_Special_Keys[DIST_NAME_KEY].key,
1752                 Trove_Special_Keys[DIST_PARAMS_KEY].key,
1753                 Trove_Special_Keys[NUM_DFILES_KEY].key,
1754                 s_op->u.getattr.fs_id, llu(s_op->u.getattr.handle));
1755
1756    s_op->resp.u.getattr.attr.u.dir.hint.dist_params =
1757        (char *) calloc(1, PVFS_REQ_LIMIT_DIST_BYTES);
1758    if (!s_op->resp.u.getattr.attr.u.dir.hint.dist_params)
1759    {
1760        js_p->error_code = -PVFS_ENOMEM;
1761        return SM_ACTION_COMPLETE;
1762    }
1763    s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len =
1764        PVFS_REQ_LIMIT_DIST_BYTES;
1765
1766    s_op->resp.u.getattr.attr.u.dir.hint.dist_name =
1767        (char *) calloc(1, PVFS_REQ_LIMIT_DIST_NAME);
1768    if (!s_op->resp.u.getattr.attr.u.dir.hint.dist_name)
1769    {
1770        js_p->error_code = -PVFS_ENOMEM;
1771        return SM_ACTION_COMPLETE;
1772    }
1773    s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len   =
1774        PVFS_REQ_LIMIT_DIST_NAME;
1775
1776    s_op->key_a =
1777        (PVFS_ds_keyval *) calloc(NUM_SPECIAL_KEYS, sizeof(PVFS_ds_keyval));
1778    if (s_op->key_a == NULL)
1779    {
1780        js_p->error_code = -PVFS_ENOMEM;
1781        return SM_ACTION_COMPLETE;
1782    }
1783    s_op->val_a = (PVFS_ds_keyval *) calloc(NUM_SPECIAL_KEYS
1784                                           ,sizeof(PVFS_ds_keyval));
1785    if (s_op->val_a == NULL)
1786    {
1787        js_p->error_code = -PVFS_ENOMEM;
1788        return SM_ACTION_COMPLETE;
1789    }
1790    s_op->u.getattr.err_array = (PVFS_error*)calloc(NUM_SPECIAL_KEYS,
1791        sizeof(PVFS_error));
1792    if(s_op->u.getattr.err_array == NULL)
1793    {
1794        js_p->error_code = -PVFS_ENOMEM;
1795        return SM_ACTION_COMPLETE;
1796
1797    }
1798
1799    s_op->free_val = 0;
1800    s_op->keyval_count = NUM_SPECIAL_KEYS;
1801    for (i = 0; i < NUM_SPECIAL_KEYS; i++)
1802    {
1803        s_op->key_a[i].buffer = Trove_Special_Keys[i].key;
1804        s_op->key_a[i].buffer_sz = Trove_Special_Keys[i].size;
1805        if (i == NUM_DFILES_KEY)
1806        {
1807            s_op->val_a[i].buffer = (char *) calloc(1, 16);
1808            if(s_op->val_a[i].buffer == NULL)
1809            {
1810                js_p->error_code = -PVFS_ENOMEM;
1811                return SM_ACTION_COMPLETE;
1812            }
1813            s_op->val_a[i].buffer_sz = 16;
1814        }
1815        else if (i == DIST_PARAMS_KEY) {
1816            s_op->val_a[i].buffer
1817            = s_op->resp.u.getattr.attr.u.dir.hint.dist_params;
1818            s_op->val_a[i].buffer_sz
1819            = s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len;
1820        }
1821        else if (i == DIST_NAME_KEY) {
1822            s_op->val_a[i].buffer
1823            = s_op->resp.u.getattr.attr.u.dir.hint.dist_name;
1824            s_op->val_a[i].buffer_sz
1825            = s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len;
1826        }
1827    }
1828
1829    js_p->error_code = 0;
1830    ret = job_trove_keyval_read_list(
1831        s_op->u.getattr.fs_id,
1832        s_op->u.getattr.handle,
1833        s_op->key_a, s_op->val_a, s_op->u.getattr.err_array, NUM_SPECIAL_KEYS,
1834        0, NULL, smcb, 0, js_p, &tmp_id,
1835        server_job_context, s_op->req->hints);
1836
1837    return ret;
1838}
1839
1840static PINT_sm_action getattr_interpret_dir_hint(
1841        struct PINT_smcb *smcb, job_status_s *js_p)
1842{
1843    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1844    if(js_p->error_code != 0 && js_p->error_code != -TROVE_ENOENT)
1845    {
1846        /* if we failed to get any of the keys, and the error code is due to
1847         * something other than the keys simply not being present, then
1848         * propigate the error.
1849         */
1850        return SM_ACTION_COMPLETE;
1851    }
1852
1853    gossip_debug(GOSSIP_SERVER_DEBUG,
1854        "getattr: job status code = %d\n", js_p->error_code);
1855    if (s_op->val_a && s_op->key_a)
1856    {
1857        long int dfile_count = 0;
1858
1859        if (s_op->u.getattr.err_array[DIST_NAME_KEY] == 0)
1860        {
1861            gossip_debug(GOSSIP_SERVER_DEBUG,
1862                "val_a[DIST_NAME_KEY] %p read_sz = %d dist_name = %s\n",
1863                s_op->val_a[DIST_NAME_KEY].buffer,
1864                s_op->val_a[DIST_NAME_KEY].read_sz,
1865                (char *)s_op->val_a[DIST_NAME_KEY].buffer);
1866            s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len =
1867                s_op->val_a[DIST_NAME_KEY].read_sz;
1868        }
1869        else
1870        {
1871            s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len = 0;
1872        }           
1873        s_op->val_a[DIST_NAME_KEY].buffer = NULL;
1874
1875        if (s_op->u.getattr.err_array[DIST_PARAMS_KEY] == 0)
1876        {
1877            gossip_debug(GOSSIP_SERVER_DEBUG,
1878                "val_a[DIST_PARAMS_KEY] %p read_sz = %d dist_params = %s\n",
1879                s_op->val_a[DIST_PARAMS_KEY].buffer,
1880                s_op->val_a[DIST_PARAMS_KEY].read_sz,
1881                (char *)s_op->val_a[DIST_PARAMS_KEY].buffer);
1882            s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len =
1883                s_op->val_a[DIST_PARAMS_KEY].read_sz;
1884        }
1885        else
1886        {
1887            s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len = 0;
1888        }
1889        if (s_op->val_a[DIST_PARAMS_KEY].buffer)
1890        {
1891           s_op->val_a[DIST_PARAMS_KEY].buffer = NULL;
1892        }
1893
1894
1895
1896        if (s_op->u.getattr.err_array[NUM_DFILES_KEY] == 0)
1897        {
1898            char *endptr = NULL;
1899            gossip_debug(GOSSIP_SERVER_DEBUG, "val_a[NUM_DFILES_KEY] %p "
1900                                              "read_sz = %d\n",
1901                s_op->val_a[NUM_DFILES_KEY].buffer,
1902                s_op->val_a[NUM_DFILES_KEY].read_sz);
1903            dfile_count = strtol(s_op->val_a[NUM_DFILES_KEY].buffer
1904                                , &endptr, 10);
1905            if (*endptr != '\0' || dfile_count < 0)
1906            {
1907                dfile_count = 0;
1908            }
1909        }
1910        if(s_op->val_a[NUM_DFILES_KEY].buffer)
1911        {
1912            free(s_op->val_a[NUM_DFILES_KEY].buffer);
1913            s_op->val_a[NUM_DFILES_KEY].buffer = NULL;
1914            s_op->val_a[NUM_DFILES_KEY].buffer_sz = 0;
1915        }
1916        s_op->keyval_count = 0;
1917        s_op->free_val = 0;
1918
1919
1920        s_op->resp.u.getattr.attr.u.dir.hint.dfile_count = dfile_count;
1921
1922        gossip_debug(GOSSIP_SERVER_DEBUG, "getattr: dir hint dfile_count: %d\n",
1923            s_op->resp.u.getattr.attr.u.dir.hint.dfile_count);
1924
1925        js_p->error_code = 0;
1926    }/* end if val_a and key_a */
1927    return SM_ACTION_COMPLETE;
1928}
1929
1930/* getattr_detect_stuffed()
1931 *
1932 * determine if a file is stuffed or not
1933 */
1934static PINT_sm_action getattr_detect_stuffed(
1935        struct PINT_smcb *smcb, job_status_s *js_p)
1936{
1937    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1938    job_id_t tmp_id;
1939
1940    /* we can determine stuffedness by the presence of the dfiles req key */
1941
1942    s_op->key.buffer = Trove_Common_Keys[NUM_DFILES_REQ_KEY].key;
1943    s_op->key.buffer_sz = Trove_Common_Keys[NUM_DFILES_REQ_KEY].size;
1944    if(s_op->free_val)
1945    {
1946        free(s_op->val.buffer);
1947    }
1948    s_op->val.buffer =  &s_op->u.getattr.num_dfiles_req;
1949    s_op->val.buffer_sz =  sizeof(s_op->u.getattr.num_dfiles_req);
1950    s_op->free_val = 0;
1951
1952    return(job_trove_keyval_read(
1953        s_op->u.getattr.fs_id,
1954        s_op->u.getattr.handle,
1955        &(s_op->key),
1956        &(s_op->val),
1957        0,
1958        NULL, smcb, 0, js_p,
1959        &tmp_id, server_job_context,
1960        s_op->req->hints));
1961}
1962
1963PINT_GET_OBJECT_REF_DEFINE(getattr);
1964
1965struct PINT_server_req_params pvfs2_get_attr_params =
1966{
1967    .string_name = "getattr",
1968    .perm = PINT_SERVER_CHECK_ATTR,
1969    .access_type = PINT_server_req_readonly,
1970    .sched_policy = PINT_SERVER_REQ_SCHEDULE,
1971    .get_object_ref = PINT_get_object_ref_getattr,
1972    .state_machine = &pvfs2_get_attr_sm
1973};
1974
1975/*
1976 * Local variables:
1977 *  mode: c
1978 *  c-indent-level: 4
1979 *  c-basic-offset: 4
1980 * End:
1981 *
1982 * vim: ft=c ts=8 sts=4 sw=4 expandtab
1983 */
1984
Note: See TracBrowser for help on using the browser.