root/branches/orange-next/src/server/get-attr.sm @ 8950

Revision 8950, 59.0 KB (checked in by bligon, 22 months ago)

Corrected compiler errors. Files affected:

src/server/create-immutable-copies.sm
src/server/get-attr.sm

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 *
6 * Changes by Acxiom Corporation to add dirent_count field to attributes
7 * Copyright © Acxiom Corporation, 2005.
8 */
9
10/* pvfs2_get_attr_sm
11 *
12 * This state machine handles incoming server getattr operations.  These
13 * are the operations sent by PVFS_sys_getattr() among others.
14 *
15 * The pvfs2_prelude_sm is responsible for reading the actual metadata
16 * to begin with, because it does this as part of the permission checking
17 * process.
18 */
19
20#include <string.h>
21#include <assert.h>
22
23#include "server-config.h"
24#include "pvfs2-server.h"
25#include "pvfs2-attr.h"
26#include "pvfs2-types.h"
27#include "pvfs2-types-debug.h"
28#include "pvfs2-util.h"
29#include "pint-util.h"
30#include "pvfs2-internal.h"
31#include "pint-cached-config.h"
32#include "pvfs2-mirror.h"
33
34PINT_server_trove_keys_s Trove_Special_Keys[] =
35{
36    {"user.pvfs2.dist_name"    , SPECIAL_DIST_NAME_KEYLEN},
37    {"user.pvfs2.dist_params"  , SPECIAL_DIST_PARAMS_KEYLEN},
38    {"user.pvfs2.num_dfiles"   , SPECIAL_NUM_DFILES_KEYLEN},
39    {"user.pvfs2.meta_hint"    , SPECIAL_METAFILE_HINT_KEYLEN},
40    {"user.pvfs2.mirror.copies", SPECIAL_MIRROR_COPIES_KEYLEN},
41    {"user.pvfs2.mirror.handles", SPECIAL_MIRROR_HANDLES_KEYLEN},
42    {"user.pvfs2.mirror.status" , SPECIAL_MIRROR_STATUS_KEYLEN},
43};
44
45enum
46{
47    STATE_METAFILE  = 7,
48    STATE_SYMLINK   = 9,
49    STATE_DIR       = 10,
50    STATE_DIR_HINT  = 11,
51    STATE_DONE      = 12,
52    SKIP_NEXT_STATE = 13,
53};
54
55static void free_nested_getattr_data(struct PINT_server_op *s_op);
56
57%%
58
59nested machine pvfs2_get_attr_work_sm
60{
61    state verify_attribs
62    {
63        run getattr_verify_attribs;
64        STATE_SYMLINK => read_symlink_target;
65        STATE_METAFILE => read_metafile_hint;
66        STATE_DIR => get_dirdata_handle;
67        default => setup_resp;
68    }
69
70    state read_symlink_target
71    {
72        run getattr_read_symlink_target;
73        default => setup_resp;
74    }
75
76    state read_metafile_hint
77    {
78        run getattr_read_metafile_hint;
79        default => interpret_metafile_hint;
80    }
81
82    state interpret_metafile_hint
83    {
84        run getattr_interpret_metafile_hint;
85        STATE_METAFILE => read_metafile_datafile_handles_if_required;
86        default => setup_resp;
87    }
88
89    state read_metafile_datafile_handles_if_required
90    {
91        run getattr_read_metafile_datafile_handles_if_required;
92        success => datafile_handles_safety_check;
93        default => setup_resp;
94    }
95
96    state datafile_handles_safety_check
97    {
98        run getattr_datafile_handles_safety_check;
99        success => read_mirrored_copies_count_if_required;
100        default => setup_resp;
101    }
102
103    state read_mirrored_copies_count_if_required
104    {
105        run getattr_read_mirrored_copies_count_if_required;
106        SKIP_NEXT_STATE => read_metafile_distribution_if_required;
107        default => read_mirrored_handles_if_required;
108    }
109
110    state read_mirrored_handles_if_required
111    {
112        run getattr_read_mirrored_handles_if_required;
113        SKIP_NEXT_STATE => read_metafile_distribution_if_required;
114        default => mirrored_handles_safety_check;
115    }
116 
117    state mirrored_handles_safety_check
118    {
119        run getattr_mirrored_handles_safety_check;
120        success => read_metafile_distribution_if_required;
121        default => setup_resp;
122    }
123
124    state read_metafile_distribution_if_required
125    {
126        run getattr_read_metafile_distribution_if_required;
127        default => interpret_metafile_distribution;
128    }
129
130    state interpret_metafile_distribution
131    {
132        run interpret_metafile_distribution;
133        success => detect_stuffed;
134        default => setup_resp;
135    }
136
137    state detect_stuffed
138    {
139        run getattr_detect_stuffed;
140        default => read_stuffed_size;
141    }
142
143    state read_stuffed_size
144    {
145        run getattr_read_stuffed_size;
146        success => interpret_stuffed_size;
147        default => setup_resp;
148    }
149
150    state interpret_stuffed_size
151    {
152        run getattr_interpret_stuffed_size;
153        default => setup_resp;
154    }
155
156    state get_dirdata_handle
157    {
158        run getattr_get_dirdata_handle;
159        success => get_dirent_count;
160        default => setup_resp;
161    }
162
163    state get_dirent_count
164    {
165        run getattr_get_dirent_count;
166        STATE_DIR_HINT => get_dir_hint;
167        default => interpret_dirent_count;
168    }
169
170    state interpret_dirent_count
171    {
172        run getattr_interpret_dirent_count;
173        default => get_dir_hint;
174    }
175
176    state get_dir_hint
177    {
178        run getattr_get_dir_hint;
179        STATE_DONE => setup_resp;
180        default => interpret_dir_hint;
181    }
182
183    state interpret_dir_hint
184    {
185        run getattr_interpret_dir_hint;
186        default => setup_resp;
187    }
188
189    state setup_resp
190    {
191        run getattr_setup_resp;
192        default => return;
193    }
194}
195
196nested machine pvfs2_get_attr_with_prelude_sm
197{
198    state init
199    {
200        run getattr_with_prelude_init;
201        default => prelude;
202    }
203
204    state prelude
205    {
206        jump pvfs2_prelude_sm;
207        success => setup_op;
208        default => return;
209    }
210
211    state setup_op
212    {
213        run getattr_setup_op;
214        default => do_work;
215    }
216
217    state do_work
218    {
219        jump pvfs2_get_attr_work_sm;
220        default => return;
221    }
222}
223
224machine pvfs2_get_attr_sm
225{
226    state work
227    {
228        jump pvfs2_get_attr_with_prelude_sm;
229        default => final_response;
230    }
231
232    state final_response
233    {
234        jump pvfs2_final_response_sm;
235        default => cleanup;
236    }
237
238    state cleanup
239    {
240        run getattr_cleanup;
241        default => terminate;
242    }
243}
244
245%%
246
247
248
249/* getattr_verify_attribs()
250 *
251 * We initialize the attribute mask that will be returned in this
252 * function.  This mask can be augmented in some of the other states.
253 */
254static PINT_sm_action getattr_verify_attribs(
255        struct PINT_smcb *smcb, job_status_s *js_p)
256{
257    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
258    PVFS_object_attr *resp_attr = NULL;
259
260    js_p->error_code = 0;
261
262    /*
263      explicitly copy basic attributes structure (read in from the
264      prelude.sm for the matching dspace) into response to be sent
265      back to the client.  this is mostly for readability here to be
266      sure we know which fields are valid in the response at this
267      point.
268    */
269    resp_attr = &s_op->resp.u.getattr.attr;
270    memset(resp_attr, 0, sizeof(PVFS_object_attr));
271
272    resp_attr->owner = s_op->attr.owner;
273    resp_attr->group = s_op->attr.group;
274    resp_attr->perms = s_op->attr.perms;
275    resp_attr->atime = s_op->attr.atime;
276
277    resp_attr->mtime = PINT_util_mkversion_time(s_op->attr.mtime);
278    if (resp_attr->mtime == 0)
279    {
280        /*
281          this is a compatibility hack to allow existing storage
282          spaces to be automagically converted to this versioned time
283          on-disk format slowly over time and doing the right thing in
284          the meantime
285        */
286        resp_attr->mtime = s_op->attr.mtime;
287
288        gossip_debug(GOSSIP_GETATTR_DEBUG, " No version found!  Using "
289                     "mtime %llu\n", llu(resp_attr->mtime));
290    }
291    else
292    {
293        gossip_debug(
294            GOSSIP_GETATTR_DEBUG, " VERSION is %llu, mtime is %llu\n",
295            llu(s_op->attr.mtime), llu(resp_attr->mtime));
296    }
297
298    resp_attr->ctime = s_op->attr.ctime;
299    resp_attr->mask = s_op->attr.mask;
300    resp_attr->objtype = s_op->attr.objtype;
301    resp_attr->u.meta.dfile_count = s_op->attr.u.meta.dfile_count;
302    resp_attr->u.meta.dist_size = s_op->attr.u.meta.dist_size;
303
304#if 0
305    gossip_debug(
306        GOSSIP_GETATTR_DEBUG,
307        "+  _DSPACE_ retrieved attrs: [owner = %d, group = %d\n\t"
308        "perms = %o, type = %d, atime = %llu, mtime = %llu\n\t"
309        "ctime = %llu, dfile_count = %d, dist_size = %d]\n",
310        resp_attr->owner, resp_attr->group, resp_attr->perms,
311        resp_attr->objtype, llu(resp_attr->atime),
312        llu(resp_attr->mtime), llu(resp_attr->ctime),
313        (int)resp_attr->u.meta.dfile_count,
314        (int)resp_attr->u.meta.dist_size);
315#endif
316
317    /*
318      weed out the attr mask of the response based on what the client
319      request asked for.  also, check if we need to retrieve more
320      information before returning the response to the client (by
321      guiding the state machine to get it).
322
323      we can safely do this now that we have the type of the object
324      (read in from the dspace, not stored in the resp_attr), and we
325      have the original client request attr mask
326      (s_op->u.getattr.attrmask).
327    */
328    switch(resp_attr->objtype)
329    {
330        case PVFS_TYPE_METAFILE:
331            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: metafile\n");
332            gossip_debug(GOSSIP_GETATTR_DEBUG,
333                         "  Req handle %llu refers to a metafile\n",
334                         llu(s_op->u.getattr.handle));
335
336            if (s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES)
337            {
338                gossip_debug(GOSSIP_GETATTR_DEBUG,
339                             " dspace has dfile_count of %d\n",
340                             resp_attr->u.meta.dfile_count);
341                resp_attr->mask |= PVFS_ATTR_META_DFILES;
342            }
343            else
344            {
345                gossip_debug(GOSSIP_GETATTR_DEBUG, " client doesn't want "
346                             "dfile info, clearing response attr mask\n");
347                resp_attr->mask &= ~PVFS_ATTR_META_DFILES;
348            }
349
350            if (s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST)
351            {
352                gossip_debug(GOSSIP_GETATTR_DEBUG,
353                             " dspace has dist size of %d\n",
354                             resp_attr->u.meta.dist_size);
355
356                resp_attr->mask |= PVFS_ATTR_META_DIST;
357            }
358            else
359            {
360                gossip_debug(GOSSIP_GETATTR_DEBUG, " client doesn't want "
361                             "dist info, clearing response attr mask\n");
362
363                resp_attr->mask &= ~PVFS_ATTR_META_DIST;
364            }
365
366            if (s_op->u.getattr.attrmask & PVFS_ATTR_META_MIRROR_DFILES)
367            {
368               gossip_debug(GOSSIP_GETATTR_DEBUG,"client wants mirrored "
369                                                 "handles.\n");
370               resp_attr->mask |= PVFS_ATTR_META_MIRROR_DFILES;
371               resp_attr->u.meta.mirror_copies_count = 0;
372               resp_attr->u.meta.mirror_dfile_array  = NULL;
373            }
374            else
375            {
376               gossip_debug(GOSSIP_GETATTR_DEBUG,"client doesn't want "
377                                                 "mirrored handles.\n");
378               resp_attr->mask &= ~(PVFS_ATTR_META_MIRROR_DFILES);
379            }
380            js_p->error_code = STATE_METAFILE;
381            break;
382        case PVFS_TYPE_DATAFILE:
383            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: datafile\n");
384            /*
385              note: the prelude already retrieved the size for us, so
386              there's no special action that needs to be taken if we have
387              a datafile here (other than adjusting our mask to include
388              the data information and copying the retrieved size from the
389              ds_attribute the prelude used)
390            */
391            resp_attr->u.data.size = s_op->ds_attr.u.datafile.b_size;
392            resp_attr->mask |= PVFS_ATTR_DATA_ALL;
393
394            gossip_debug(GOSSIP_GETATTR_DEBUG, "  handle %llu refers to "
395                         "a datafile (size = %lld).\n",
396                         llu(s_op->u.getattr.handle),
397                         lld(resp_attr->u.data.size));
398            break;
399        case PVFS_TYPE_DIRECTORY:
400            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: directory\n");
401            if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_DIRENT_COUNT)
402            {
403                gossip_debug(GOSSIP_GETATTR_DEBUG,
404                             " getattr: dirent_count needed.\n");
405                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
406                resp_attr->mask |= PVFS_ATTR_DIR_DIRENT_COUNT;
407                js_p->error_code = STATE_DIR;
408            }
409            else
410            {
411                gossip_debug(GOSSIP_GETATTR_DEBUG,
412                             " getattr: dirent_count not needed.\n");
413                js_p->error_code = 0;
414                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
415            }
416            if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_HINT)
417            {
418                gossip_debug(GOSSIP_GETATTR_DEBUG,
419                            " getattr: dfile_count needed.\n");
420                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
421                resp_attr->mask |= PVFS_ATTR_DIR_HINT;
422                js_p->error_code = STATE_DIR;
423            }
424            else
425            {
426                gossip_debug(GOSSIP_GETATTR_DEBUG,
427                            " getattr: dfile_count not needed\n");
428                assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
429            }
430            break;
431        case PVFS_TYPE_DIRDATA:
432            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: dirdata\n");
433            gossip_debug(
434                GOSSIP_GETATTR_DEBUG, "  handle %llu refers to "
435                "a dirdata object. doing nothing special\n",
436                llu(s_op->u.getattr.handle));
437            assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
438            break;
439        case PVFS_TYPE_SYMLINK:
440            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: symlink\n");
441            gossip_debug(
442                GOSSIP_GETATTR_DEBUG, "  handle %llu refers to a symlink.\n",
443                llu(s_op->u.getattr.handle));
444
445            /*
446              we'll definitely have to fetch the symlink target in this
447              case, as the prelude will never retrieve it for us
448            */
449            js_p->error_code = STATE_SYMLINK;
450            break;
451        case PVFS_TYPE_INTERNAL:
452            PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: symlink\n");
453            /* nothing interesting to add; this is meaningless to a client */
454            break;
455        default:
456            /* if we don't understand the object type, then it probably indicates
457             * a bug or some data corruption.  All trove objects should have a
458             * type set.
459             */
460            gossip_err(
461                "Error: got unknown type when verifying attributes for "
462                "handle %llu.\n",
463                llu(s_op->u.getattr.handle));
464            js_p->error_code = -PVFS_ENXIO;
465            break;
466    }
467
468    return SM_ACTION_COMPLETE;
469}
470
471static PINT_sm_action getattr_read_symlink_target(
472        struct PINT_smcb *smcb, job_status_s *js_p)
473{
474    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
475    int ret;
476    job_id_t i;
477
478    /* if we don't need to fill in the symlink target, skip it */
479    if (!(s_op->u.getattr.attrmask & PVFS_ATTR_SYMLNK_TARGET))
480    {
481        gossip_debug(GOSSIP_GETATTR_DEBUG, "skipping symlink target read\n");
482        js_p->error_code = 0;
483        return SM_ACTION_COMPLETE;
484    }
485   
486    s_op->key.buffer    = Trove_Common_Keys[SYMLINK_TARGET_KEY].key;
487    s_op->key.buffer_sz = Trove_Common_Keys[SYMLINK_TARGET_KEY].size;
488
489    /*
490      optimistically add mask value to indicate the symlink target is
491      filled (error_code is checked in next state)
492    */
493    s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_SYMLNK_TARGET;
494
495    s_op->resp.u.getattr.attr.u.sym.target_path_len = PVFS_NAME_MAX;
496    s_op->resp.u.getattr.attr.u.sym.target_path =
497        malloc(s_op->resp.u.getattr.attr.u.sym.target_path_len);
498    if (!s_op->resp.u.getattr.attr.u.sym.target_path)
499    {
500        js_p->error_code = -PVFS_ENOMEM;
501        return SM_ACTION_COMPLETE;
502    }
503
504    if(s_op->free_val)
505    {
506        free(s_op->val.buffer);
507    }
508    s_op->val.buffer = s_op->resp.u.getattr.attr.u.sym.target_path;
509    s_op->val.buffer_sz = s_op->resp.u.getattr.attr.u.sym.target_path_len;
510    /* this will get cleaned up with attr structure */
511    s_op->free_val = 0;
512
513    ret = job_trove_keyval_read(
514        s_op->u.getattr.fs_id, s_op->u.getattr.handle,
515        &s_op->key, &s_op->val,
516        0,
517        NULL, smcb, 0, js_p,
518        &i, server_job_context, s_op->req->hints);
519
520
521    return ret;
522}
523
524static PINT_sm_action getattr_interpret_metafile_hint(
525    PINT_smcb *smcb, job_status_s *js_p)
526{
527    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
528    PVFS_object_attr *resp_attr = NULL;
529    PVFS_metafile_attr *meta = &(s_op->resp.u.getattr.attr.u.meta);
530    resp_attr = &s_op->resp.u.getattr.attr;
531
532    assert(resp_attr->objtype == PVFS_TYPE_METAFILE);
533
534    if (js_p->error_code == 0 || js_p->error_code == -TROVE_ENOENT)
535    {
536        if (js_p->error_code == 0)
537        {
538            memcpy(&(meta->hint), s_op->val.buffer, sizeof(meta->hint));
539        }
540        if ((resp_attr->mask & PVFS_ATTR_META_DFILES) ||
541            (resp_attr->mask & PVFS_ATTR_META_DIST)   ||
542            (resp_attr->mask & PVFS_ATTR_META_MIRROR_DFILES))
543        {
544            gossip_debug(GOSSIP_GETATTR_DEBUG, " * client wants extra "
545                         "meta info, about to retrieve it now\n");
546            js_p->error_code = STATE_METAFILE;
547            if ( (resp_attr->mask  & PVFS_ATTR_META_MIRROR_DFILES) &&
548                !(meta->hint.flags & PVFS_MIRROR_FL) )
549                  resp_attr->mask &= ~(PVFS_ATTR_META_MIRROR_DFILES);
550        }
551        else
552        {
553            gossip_debug(GOSSIP_GETATTR_DEBUG, " * client doesn't want "
554                         "extra meta info, preparing response now\n");
555            js_p->error_code = 0;
556        }
557    } else {
558        /*If we hit an error the DIST & DFILES are no longer valid*/
559        resp_attr->mask &= ~PVFS_ATTR_META_DIST;
560        resp_attr->mask &= ~PVFS_ATTR_META_DFILES;
561        resp_attr->mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
562    }
563    return SM_ACTION_COMPLETE;
564}
565
566static PINT_sm_action getattr_read_metafile_hint(
567    struct PINT_smcb *smcb, job_status_s *js_p)
568{
569    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
570    int ret = -PVFS_EINVAL;
571    job_id_t i;
572    char *buf = NULL;
573
574    assert(s_op->attr.objtype == PVFS_TYPE_METAFILE);
575    buf = (char *) calloc(sizeof(PVFS_metafile_hint) + 1, 1);
576    if (buf == NULL)
577    {
578        js_p->error_code = -PVFS_ENOMEM;
579        /*If we hit an error the DIST & DFILES are no longer valid*/
580        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
581        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
582        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
583        return 1;
584    }
585
586    js_p->error_code = 0;
587
588    s_op->key.buffer = Trove_Special_Keys[METAFILE_HINT_KEY].key;
589    s_op->key.buffer_sz = Trove_Special_Keys[METAFILE_HINT_KEY].size;
590
591    if(s_op->free_val)
592    {
593        free(s_op->val.buffer);
594    }
595    s_op->val.buffer = buf;
596    s_op->val.buffer_sz = sizeof(s_op->resp.u.getattr.attr.u.meta.hint) + 1;
597    s_op->free_val = 1;
598
599    gossip_debug(GOSSIP_GETATTR_DEBUG,
600                 "  reading metafile hint (coll_id = %d, "
601                 "handle = %llu, key = %s (%d), val_buf = %p (%d))\n",
602                 s_op->u.getattr.fs_id,
603                 llu(s_op->u.getattr.handle), (char *)s_op->key.buffer,
604                 s_op->key.buffer_sz, s_op->val.buffer,
605                 s_op->val.buffer_sz);
606
607    ret = job_trove_keyval_read(
608        s_op->u.getattr.fs_id, s_op->u.getattr.handle,
609        &s_op->key, &s_op->val,
610        0,
611        NULL, smcb, 0, js_p,
612        &i, server_job_context, s_op->req->hints);
613
614    return ret;
615}
616
617static PINT_sm_action getattr_read_metafile_datafile_handles_if_required(
618        struct PINT_smcb *smcb, job_status_s *js_p)
619{
620    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
621    int ret = -PVFS_EINVAL;
622    int dfile_count = 0;
623    job_id_t i;
624
625    assert(s_op->attr.objtype == PVFS_TYPE_METAFILE);
626
627    js_p->error_code = 0;
628
629    /* if we don't need to fill in the dfiles, skip them */
630    if (!(s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES))
631    {
632        gossip_debug(GOSSIP_GETATTR_DEBUG, "skipping data handle read\n");
633        return SM_ACTION_COMPLETE;
634    }
635
636    dfile_count = s_op->resp.u.getattr.attr.u.meta.dfile_count;
637
638    gossip_debug(GOSSIP_GETATTR_DEBUG,
639                 " request has dfile_count of %d | dspace has %d\n",
640                 s_op->resp.u.getattr.attr.u.meta.dfile_count,
641                 s_op->resp.u.getattr.attr.u.meta.dfile_count);
642
643    /* verify that the retrieved dfile count is sane */
644    if (!PVFS_REQ_LIMIT_DFILE_COUNT_IS_VALID(dfile_count))
645    {
646        gossip_err("The requested dfile count of %d is invalid; "
647                   "aborting operation.\n", dfile_count);
648        gossip_err(
649            "+ attrs read from dspace: (owner = %d, group = %d, "
650            "perms = %o, type = %d\n   atime = %lld, mtime = %lld, "
651            "ctime = %lld |\n   dfile_count = %d | dist_size = %d)\n",
652            s_op->resp.u.getattr.attr.owner,
653            s_op->resp.u.getattr.attr.group,
654            s_op->resp.u.getattr.attr.perms,
655            s_op->resp.u.getattr.attr.objtype,
656            lld(s_op->resp.u.getattr.attr.atime),
657            lld(s_op->resp.u.getattr.attr.mtime),
658            lld(s_op->resp.u.getattr.attr.ctime),
659            (int)s_op->resp.u.getattr.attr.u.meta.dfile_count,
660            (int)s_op->resp.u.getattr.attr.u.meta.dist_size);
661
662        gossip_err("handle: %llu (%llx), fsid: %d.\n",
663            llu(s_op->u.getattr.handle), llu(s_op->u.getattr.handle),
664            (int)s_op->u.getattr.fs_id);
665
666        /*If we hit an error the DIST & DFILES are no longer valid*/
667        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
668        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
669        s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
670       
671        js_p->error_code = -PVFS_EOVERFLOW;
672        return SM_ACTION_COMPLETE;
673    }
674
675    s_op->key.buffer = Trove_Common_Keys[METAFILE_HANDLES_KEY].key;
676    s_op->key.buffer_sz = Trove_Common_Keys[METAFILE_HANDLES_KEY].size;
677
678    /* add mask value to indicate the data file array is filled */
679    s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_META_DFILES;
680
681    s_op->resp.u.getattr.attr.u.meta.dfile_array =
682        malloc(dfile_count * sizeof(PVFS_handle));
683    if (!s_op->resp.u.getattr.attr.u.meta.dfile_array)
684    {
685        gossip_err("Cannot allocate dfile array of count %d\n",
686                   dfile_count);
687        js_p->error_code = -PVFS_ENOMEM;
688        return SM_ACTION_COMPLETE;
689    }
690
691    if(s_op->free_val)
692    {
693        free(s_op->val.buffer);
694    }
695    s_op->val.buffer = s_op->resp.u.getattr.attr.u.meta.dfile_array;
696    s_op->val.buffer_sz = (dfile_count * sizeof(PVFS_handle));
697    /* this will get cleaned up with attr structure */
698    s_op->free_val = 0;
699
700    gossip_debug(GOSSIP_GETATTR_DEBUG,
701                 "  reading %d datafile handles (coll_id = %d, "
702                 "handle = %llu, key = %s (%d), val_buf = %p (%d))\n",
703                 dfile_count, s_op->u.getattr.fs_id,
704                 llu(s_op->u.getattr.handle), (char *)s_op->key.buffer,
705                 s_op->key.buffer_sz, s_op->val.buffer,
706                 s_op->val.buffer_sz);
707
708    ret = job_trove_keyval_read(
709        s_op->u.getattr.fs_id
710       ,s_op->u.getattr.handle
711       ,&s_op->key
712       ,&s_op->val
713       ,0
714       ,NULL
715       ,smcb
716       ,0
717       ,js_p
718       ,&i
719       ,server_job_context
720       ,s_op->req->hints);
721
722    return ret;
723}
724
725
726static PINT_sm_action getattr_read_mirrored_copies_count_if_required(
727        struct PINT_smcb *smcb, job_status_s *js_p)
728{
729    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s...\n",__func__);
730
731    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
732    struct PVFS_server_resp *resp = &(s_op->resp);
733    PVFS_metafile_attr *meta =  &(resp->u.getattr.attr.u.meta);
734    int ret = -PVFS_EINVAL;
735    job_id_t job_id;
736
737   /* Are we mirroring? */
738    if (!(resp->u.getattr.attr.mask & PVFS_ATTR_META_MIRROR_DFILES))
739    {
740        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tMirroring is NOT turned on "
741                                         "for this handle(%llu)..\n"
742                                        ,llu(s_op->u.getattr.handle));
743        js_p->error_code = SKIP_NEXT_STATE;
744
745        return SM_ACTION_COMPLETE;
746    }
747
748    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tMirroring IS turned on for this "
749                                     "handle(%llu)...\n"
750                                    ,llu(s_op->u.getattr.handle));
751
752    js_p->error_code = 0;
753
754    /* setup job to read user.pvfs2.mirror.copies */
755
756    /* initialize */
757    if (s_op->free_val)
758       free(s_op->val.buffer);
759    memset(&(s_op->val),0,sizeof(s_op->val));
760    memset(&(s_op->key),0,sizeof(s_op->key));
761
762    /* set key = user.pvfs2.mirror.copies */
763    s_op->key.buffer    = Trove_Special_Keys[MIRROR_COPIES_KEY].key;
764    s_op->key.buffer_sz = Trove_Special_Keys[MIRROR_COPIES_KEY].size;
765
766    /* setup space for retrieved value */
767    meta->mirror_copies_count = 0;
768    meta->mirror_dfile_array  = NULL;
769    s_op->val.buffer = &(meta->mirror_copies_count);
770    s_op->val.buffer_sz = sizeof(meta->mirror_copies_count);
771    s_op->free_val = 0;
772
773    /* submit job to read the value */
774    ret = job_trove_keyval_read(
775        s_op->u.getattr.fs_id
776       ,s_op->u.getattr.handle
777       ,&s_op->key
778       ,&s_op->val
779       ,0
780       ,NULL
781       ,smcb
782       ,0
783       ,js_p
784       ,&job_id
785       ,server_job_context
786       ,s_op->req->hints);
787
788    return ret;
789}/*end getattr_read_mirrored_copies_if_required*/
790
791
792
793static PINT_sm_action getattr_read_mirrored_handles_if_required(
794        struct PINT_smcb *smcb, job_status_s *js_p)
795{
796   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s ...\n",__func__);
797
798    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
799    struct PVFS_server_resp *resp = &(s_op->resp);
800    PVFS_metafile_attr *meta =  &(resp->u.getattr.attr.u.meta);
801    int ret = -PVFS_EINVAL;
802    job_id_t job_id;
803    int i;
804
805   /* Did we find mirror.copies? */
806    if (js_p->error_code < 0)
807    {
808        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies cannot "
809                                         "be retrieved.\n");
810        if (resp->u.getattr.attr.mask & (PVFS_ATTR_META_DFILES |
811                                         PVFS_ATTR_META_DIST) )
812        {
813            resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
814            js_p->error_code = SKIP_NEXT_STATE;
815        } else {
816            resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
817            resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
818            resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
819        }
820        memset(&(s_op->key),0,sizeof(s_op->key));
821        memset(&(s_op->val),0,sizeof(s_op->val));
822        s_op->free_val = 0;
823
824        return SM_ACTION_COMPLETE;
825    }
826
827    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies count "
828                                     "successfully retrieved.\n");
829
830   /* check number of mirrored copies */
831    if (meta->mirror_copies_count == 0)
832    {
833       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies "
834                                        "is ZERO.\n");
835       gossip_lerr("Mirror handles requested, but number of mirrored copies "
836                   "is zero.\n");
837       resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
838       js_p->error_code = SKIP_NEXT_STATE;
839       return SM_ACTION_COMPLETE;
840    }
841
842    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies "
843                                     "retrieved : %d\n"
844                                    ,meta->mirror_copies_count);
845
846
847   /* check to see if total number of mirrored handles is sane */
848   if ( (meta->dfile_count * meta->mirror_copies_count) >
849         PVFS_REQ_LIMIT_MIRROR_DFILE_COUNT )
850   {
851       gossip_lerr("Number of mirrored handles(%d) exceeds the system "
852                   "limit(%d)\n"
853                  ,meta->dfile_count * meta->mirror_copies_count
854                  ,PVFS_REQ_LIMIT_MIRROR_DFILE_COUNT);
855       resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
856       js_p->error_code = SKIP_NEXT_STATE;
857       return SM_ACTION_COMPLETE;
858   }
859
860   js_p->error_code = 0;
861   /* get mirrored handles and status of each handle */
862
863   /* initialize */
864   if (s_op->free_val)
865      free(s_op->val.buffer);
866   memset(&(s_op->key),0,sizeof(s_op->key));
867   memset(&(s_op->val),0,sizeof(s_op->val));
868
869   for (i=0; i<s_op->keyval_count; i++)
870       if (s_op->val_a && s_op->val_a[i].buffer && s_op->free_val)
871           free(s_op->val_a[i].buffer);
872   if (s_op->val_a)
873       free(s_op->val_a);
874   if (s_op->key_a)
875       free(s_op->key_a);
876   if (s_op->error_a)
877      free(s_op->error_a);
878   s_op->free_val = 0;
879
880   /* allocate space for keys and values */
881   s_op->keyval_count = 2;
882   s_op->free_val = 1;
883   s_op->key_a = s_op->val_a = NULL;
884   s_op->error_a = NULL;
885   
886   s_op->key_a   = malloc(sizeof(*s_op->key_a)   * s_op->keyval_count);
887   s_op->val_a   = malloc(sizeof(*s_op->val_a)   * s_op->keyval_count);
888   s_op->error_a = malloc(sizeof(*s_op->error_a) * s_op->keyval_count);
889   if (!s_op->key_a || !s_op->val_a || !s_op->error_a)
890   {
891      gossip_lerr("Cannot allocate memory for key/val/error.\n");
892      js_p->error_code = -PVFS_ENOMEM;
893      goto error_exit;
894   }
895   memset(s_op->key_a,0,sizeof(*s_op->key_a));
896   memset(s_op->val_a,0,sizeof(*s_op->val_a));
897
898   /* set key = user.pvfs2.mirror.handles */
899   s_op->key_a[0].buffer    = Trove_Special_Keys[MIRROR_HANDLES_KEY].key;
900   s_op->key_a[0].buffer_sz = Trove_Special_Keys[MIRROR_HANDLES_KEY].size;
901
902   /* setup buffer space for handles */
903   s_op->val_a[0].buffer = malloc(sizeof(PVFS_handle) *
904                                  meta->dfile_count   *
905                                  meta->mirror_copies_count);
906   if (!s_op->val_a[0].buffer)
907   {
908      gossip_lerr("Cannot allocate memory for mirrored handles.\n");
909      js_p->error_code = -PVFS_ENOMEM;
910      goto error_exit;
911   }
912   memset(s_op->val_a[0].buffer,0,sizeof(PVFS_handle) *
913                                  meta->dfile_count   *
914                                  meta->mirror_copies_count);
915   s_op->val_a[0].buffer_sz = sizeof(PVFS_handle) *
916                              meta->dfile_count   *
917                              meta->mirror_copies_count;
918
919   /* set key = user.pvfs2.mirror.status */
920   s_op->key_a[1].buffer    = Trove_Special_Keys[MIRROR_STATUS_KEY].key;
921   s_op->key_a[1].buffer_sz = Trove_Special_Keys[MIRROR_STATUS_KEY].size;
922
923   /* setup buffer space for handle statuses */
924   s_op->val_a[1].buffer = malloc(sizeof(PVFS_error) *
925                                  meta->dfile_count  *
926                                  meta->mirror_copies_count);
927   if (!s_op->val_a[1].buffer)
928   {
929       gossip_lerr("Cannot allocate memory for mirrored handle statuses.\n");
930       js_p->error_code = -PVFS_ENOMEM;
931       goto error_exit;
932   }
933   memset(s_op->val_a[1].buffer,0,sizeof(PVFS_error) *
934                                  meta->dfile_count  *
935                                  meta->mirror_copies_count);
936   s_op->val_a[1].buffer_sz = sizeof(PVFS_error) *
937                              meta->dfile_count  *
938                              meta->mirror_copies_count;
939
940   
941   /* call job to retrieve the key/val pairs */
942   ret = job_trove_keyval_read_list(
943          s_op->u.getattr.fs_id
944         ,s_op->u.getattr.handle
945         ,s_op->key_a
946         ,s_op->val_a
947         ,s_op->error_a
948         ,s_op->keyval_count
949         ,0
950         ,NULL
951         ,smcb
952         ,0
953         ,js_p
954         ,&job_id
955         ,server_job_context
956         ,s_op->req->hints );
957
958   return ret;
959
960error_exit:
961   for (i=0; i<s_op->keyval_count; i++)
962   {
963       if (s_op->val_a && s_op->val_a[i].buffer)
964          free(s_op->val_a[i].buffer);
965   }
966   if (s_op->val_a)
967       free(s_op->val_a);
968   if (s_op->key_a)
969       free(s_op->key_a);
970   if (s_op->error_a)
971       free(s_op->error_a);
972   s_op->val_a = s_op->key_a = NULL;
973   s_op->error_a = NULL;
974   s_op->keyval_count = 0;
975   s_op->free_val = 0;
976
977   resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
978
979   return SM_ACTION_COMPLETE;
980}/*end getattr_read_mirrored_handles_if_required*/
981
982
983
984
985static PINT_sm_action getattr_mirrored_handles_safety_check(
986        struct PINT_smcb *smcb, job_status_s *js_p)
987{
988   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s ...\n",__func__);
989
990    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
991    struct PVFS_server_resp *resp = &(s_op->resp);
992    PVFS_metafile_attr *meta =  &(resp->u.getattr.attr.u.meta);
993    int row,col,index;
994    int i;
995
996
997   js_p->error_code = 0;
998
999   /* Check the error code for each key/val pair. */
1000   for (i=0; i<s_op->keyval_count; i++)
1001   {
1002       if (s_op->error_a[i] != 0)
1003       {
1004           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRetrieval of key(%s) failed.\n"
1005                                           ,(char *)s_op->key_a[i].buffer);
1006           js_p->error_code = s_op->error_a[i];
1007       }
1008   }
1009
1010   if (js_p->error_code)
1011   {
1012      goto error_exit;
1013   }
1014
1015   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tWe successfully retrieved handles and "
1016                                    "statuses.\n");
1017
1018   for (i=0; i<s_op->keyval_count; i++)
1019   {  /* Did we get the data that we were expecting from
1020       * user.pvfs2.mirror.handles(i=0) or
1021       * user.pvfs2.mirror.statuses(i=1)?
1022       */
1023
1024     if (s_op->val_a[i].read_sz != s_op->val_a[i].buffer_sz)
1025     {
1026         gossip_lerr("Error: %s key found val size: %d when "
1027                    "expecting val size: %d\n"
1028                    ,(char *)s_op->key_a[i].buffer
1029                    ,s_op->val_a[i].read_sz
1030                    ,s_op->val_a[i].buffer_sz);
1031         js_p->error_code = s_op->val_a[i].buffer_sz;
1032     }
1033   }/*end for*/
1034
1035   if (js_p->error_code)
1036   {
1037       goto error_exit;
1038   }
1039
1040   /*initialize permanent data structures*/
1041   meta->mirror_dfile_array = s_op->val_a[0].buffer;
1042   s_op->u.getattr.mirror_dfile_status_array = s_op->val_a[1].buffer;
1043   s_op->val_a[0].buffer = s_op->val_a[1].buffer = NULL;
1044
1045   /* Check the mirroring status for each handle.  If the status is non-zero,
1046    * the handle is not valid, so put a null in the mirror array for that
1047    * handle.  Otherwise, do nothing.
1048   */
1049   for (row=0; row<meta->mirror_copies_count; row++)
1050   {
1051      for (col=0; col<meta->dfile_count; col++)
1052      {
1053         index = (row*meta->dfile_count) + col;
1054         if ( s_op->u.getattr.mirror_dfile_status_array[index] == STATUS_INIT )
1055              PVFS_handle_clear(meta->mirror_dfile_array[index]);
1056         gossip_debug(GOSSIP_MIRROR_DEBUG,
1057                      "\tmirror handle[%d]:%llu \t"
1058                      "status:%d\n"
1059                      ,index
1060                      ,llu(meta->mirror_dfile_array[index])
1061                      ,s_op->u.getattr.mirror_dfile_status_array[index]);
1062      }
1063   }
1064
1065   /*Cleanup*/
1066   free(s_op->key_a);
1067   free(s_op->val_a);
1068   free(s_op->error_a);
1069   s_op->key_a = s_op->val_a = NULL;
1070   s_op->error_a = NULL;
1071   s_op->keyval_count = 0;
1072   s_op->free_val = 0;   
1073
1074   js_p->error_code = 0;
1075   return SM_ACTION_COMPLETE;
1076
1077error_exit:
1078   /* if we have an error, cleanup, and pretend that we never attempted
1079    * mirrors in the first place.
1080   */
1081      gossip_debug(GOSSIP_MIRROR_DEBUG,"\tCleaning up mirror operation...\n");
1082      js_p->error_code = 0;
1083      for (i=0; i<s_op->keyval_count; i++)
1084      {
1085          if (s_op->val_a[i].buffer)
1086              free(s_op->val_a[i].buffer);
1087      }
1088      free(s_op->key_a);
1089      free(s_op->val_a);
1090      free(s_op->error_a);
1091      s_op->key_a = s_op->val_a = NULL;
1092      s_op->error_a = NULL;
1093      s_op->keyval_count = 0;
1094     
1095      /*We MUST set the number of copies to zero to prevent encoding errors
1096       *later.
1097      */
1098      meta->mirror_copies_count = 0;
1099      meta->mirror_dfile_array = NULL;
1100      s_op->u.getattr.mirror_dfile_status_array = NULL;
1101      resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
1102      return SM_ACTION_COMPLETE;
1103}/*end getattr_mirrored_handles_safety_check*/
1104
1105
1106
1107
1108static PINT_sm_action getattr_read_metafile_distribution_if_required(
1109        struct PINT_smcb *smcb, job_status_s *js_p)
1110{
1111    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1112    int ret = -PVFS_EINVAL;
1113    job_id_t i;
1114
1115    assert(s_op->attr.objtype == PVFS_TYPE_METAFILE);
1116
1117    js_p->error_code = 0;
1118
1119    /* if we don't need to fill in the distribution, skip it */
1120    if (!(s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST))
1121    {
1122        gossip_debug(GOSSIP_GETATTR_DEBUG, "skipping data handle "
1123                     "distribution read\n");
1124        return SM_ACTION_COMPLETE;
1125    }
1126
1127    s_op->key.buffer = Trove_Common_Keys[METAFILE_DIST_KEY].key;
1128    s_op->key.buffer_sz = Trove_Common_Keys[METAFILE_DIST_KEY].size;
1129
1130    /*
1131      there *should* be some distribution information.  if not, dump
1132      which handle is busted and assertion die for now while we're not
1133      handling this kind of error
1134    */
1135    if (s_op->resp.u.getattr.attr.u.meta.dist_size < 1)
1136    {
1137        gossip_err("Cannot Read Dist!  Got an invalid dist size for "
1138                   "handle %llu,%d\n",llu(s_op->u.getattr.handle),
1139                   s_op->u.getattr.fs_id);
1140        js_p->error_code = -PVFS_EINVAL;
1141        return SM_ACTION_COMPLETE;
1142    }
1143    assert(s_op->resp.u.getattr.attr.u.meta.dist_size > 0);
1144
1145    /* add mask value to indicate the distribution is filled */
1146    s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_META_DIST;
1147
1148    if(s_op->free_val)
1149    {
1150        free(s_op->val.buffer);
1151    }
1152    s_op->val.buffer_sz = s_op->resp.u.getattr.attr.u.meta.dist_size;
1153    s_op->val.buffer = malloc(s_op->val.buffer_sz);
1154    if (!s_op->val.buffer)
1155    {
1156        gossip_err("Cannot allocate dist of size %d\n",
1157                   s_op->val.buffer_sz);
1158        js_p->error_code = -PVFS_ENOMEM;
1159        return SM_ACTION_COMPLETE;
1160    }
1161    s_op->free_val = 1;
1162
1163    ret = job_trove_keyval_read(
1164        s_op->u.getattr.fs_id, s_op->u.getattr.handle,
1165        &(s_op->key), &(s_op->val),
1166        0,
1167        NULL,
1168        smcb, 0, js_p, &i, server_job_context, s_op->req->hints);
1169
1170    return ret;
1171}
1172
1173static PINT_sm_action getattr_read_stuffed_size(
1174    struct PINT_smcb *smcb, job_status_s *js_p)
1175{
1176    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1177    job_id_t job_id;
1178
1179    if(js_p->error_code == -TROVE_ENOENT)
1180    {
1181        gossip_debug(
1182            GOSSIP_GETATTR_DEBUG, "Getattr detected non-stuffed file.\n");
1183        /* this means that the keyval fields used to indicate a file is
1184         * stuffed are not present.  Set mask accordingly and continue.
1185         */
1186        s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_META_UNSTUFFED;
1187        js_p->error_code = 0;
1188        return SM_ACTION_COMPLETE;
1189    }
1190    if(js_p->error_code)
1191    {
1192        /* any other error code here is just a normal error case */
1193        /* preserve error code and catch next error transition */
1194        return SM_ACTION_COMPLETE;
1195    }
1196
1197    gossip_debug(
1198        GOSSIP_GETATTR_DEBUG, "Getattr detected stuffed file.\n");
1199    /* otherwise, we found keyval fields indicating that the file is
1200     * stuffed.  It does not matter if the client asked for the size or not;
1201     * we must retrieve a valid stuffed_size value for the attrs.
1202     */
1203    s_op->resp.u.getattr.attr.mask &= (~(PVFS_ATTR_META_UNSTUFFED));
1204
1205    return(job_trove_dspace_getattr(
1206        s_op->u.getattr.fs_id,
1207        s_op->resp.u.getattr.attr.u.meta.dfile_array[0],
1208        smcb,
1209        &s_op->ds_attr,
1210        0,
1211        js_p,
1212        &job_id,
1213        server_job_context,
1214        s_op->req->hints));
1215}
1216
1217static PINT_sm_action getattr_interpret_stuffed_size(
1218    struct PINT_smcb *smcb, job_status_s *js_p)
1219{
1220    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1221    PVFS_metafile_attr *meta    = &(s_op->resp.u.getattr.attr.u.meta);
1222
1223    if(js_p->error_code == 0)
1224    {
1225        meta->stuffed_size = s_op->ds_attr.u.datafile.b_size;
1226    }
1227
1228    /* deliberately leave error_code unchanged so that any errors get
1229     * handled in the next state
1230     */
1231    return SM_ACTION_COMPLETE;
1232}
1233
1234
1235/* interpret_metafile_distribution()
1236 *
1237 * capture and encode results of reading distribution
1238 */
1239static PINT_sm_action interpret_metafile_distribution(
1240        struct PINT_smcb *smcb, job_status_s *js_p)
1241{
1242    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1243    PVFS_object_attr *resp_attr = &s_op->resp.u.getattr.attr;
1244   
1245    if(js_p->error_code < 0)
1246    {
1247        return SM_ACTION_COMPLETE;
1248    }
1249
1250    if(s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST)
1251    {
1252        /* successfully read dist key; make sure we got something valid */
1253        if(s_op->val.read_sz != s_op->val.buffer_sz)
1254        {
1255            gossip_err("Error: %s key found val size: %d when "
1256                       "expecting val size: %d\n",
1257                Trove_Common_Keys[METAFILE_DIST_KEY].key,
1258                s_op->val.read_sz,
1259                s_op->val.buffer_sz);
1260
1261            /* clear bitmask to prevent double free between setup_resp and
1262             * PINT_free_object_attr()
1263             */
1264            s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
1265
1266            js_p->error_code = -PVFS_EIO;
1267            return SM_ACTION_COMPLETE;
1268        }
1269
1270        assert(s_op->val.buffer);
1271        PINT_dist_decode(&resp_attr->u.meta.dist, s_op->val.buffer);
1272
1273        if(resp_attr->u.meta.dist == 0) {
1274            gossip_err("Found dist of 0 for handle %llu,%d\n",
1275                    llu(s_op->u.getattr.handle), s_op->u.getattr.fs_id);
1276            PVFS_perror("Metafile getattr_setup_resp",js_p->error_code);
1277            js_p->error_code = -PVFS_EIO;
1278            return SM_ACTION_COMPLETE;
1279        }
1280    }
1281
1282    js_p->error_code = 0;
1283    return SM_ACTION_COMPLETE;
1284}
1285
1286static PINT_sm_action getattr_setup_resp(
1287        struct PINT_smcb *smcb, job_status_s *js_p)
1288{
1289    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1290    PVFS_object_attr *resp_attr = &s_op->resp.u.getattr.attr;
1291
1292    if(js_p->error_code > 0)
1293    {
1294        /* if we reach this state with a positive error code it means that
1295         * nothing is wrong; we just used one of the explicit STATE_*
1296         * transitions
1297         */
1298        js_p->error_code = 0;
1299    }
1300    if(js_p->error_code < 0)
1301    {
1302        free_nested_getattr_data(s_op);
1303        return SM_ACTION_COMPLETE;
1304    }
1305
1306    gossip_debug(
1307        GOSSIP_GETATTR_DEBUG,
1308        "-  retrieved attrs: [owner = %d, group = %d\n\t"
1309        "perms = %o, type = %d, atime = %llu, mtime = %llu\n\t"
1310        "ctime = %llu, dist_size = %d]\n",
1311        resp_attr->owner, resp_attr->group, resp_attr->perms,
1312        resp_attr->objtype, llu(resp_attr->atime),
1313        llu(resp_attr->mtime), llu(resp_attr->ctime),
1314        (int)resp_attr->u.meta.dist_size);
1315
1316    if (resp_attr->objtype == PVFS_TYPE_METAFILE)
1317    {
1318        if (resp_attr->mask & PVFS_ATTR_META_DFILES)
1319        {
1320            if (resp_attr->u.meta.dfile_count)
1321            {
1322                assert(resp_attr->u.meta.dfile_array);
1323            }
1324            gossip_debug(GOSSIP_GETATTR_DEBUG,
1325                         "  also returning %d datafile handles\n",
1326                         resp_attr->u.meta.dfile_count);
1327        }
1328        if (resp_attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
1329        {
1330           if (resp_attr->u.meta.mirror_copies_count)
1331              assert(resp_attr->u.meta.mirror_dfile_array);
1332           gossip_debug(GOSSIP_GETATTR_DEBUG,
1333                        "  also returning %d mirrored copies\n"
1334                       ,resp_attr->u.meta.mirror_copies_count);
1335        }
1336        if (resp_attr->mask & PVFS_ATTR_META_DIST)
1337        {
1338            /* we have already gathered the dist field in an earlier state */
1339            gossip_debug(GOSSIP_GETATTR_DEBUG,
1340                         "  also returning dist size of %d\n",
1341                         resp_attr->u.meta.dist_size);
1342        }
1343    }
1344    else if ((resp_attr->objtype == PVFS_TYPE_DATAFILE) &&
1345             (resp_attr->mask & PVFS_ATTR_DATA_SIZE))
1346    {
1347        gossip_debug(GOSSIP_GETATTR_DEBUG,
1348                     "  also returning data size of %lld\n",
1349                     lld(resp_attr->u.data.size));
1350    }
1351    else if ((resp_attr->objtype == PVFS_TYPE_SYMLINK) &&
1352             (resp_attr->mask & PVFS_ATTR_SYMLNK_TARGET))
1353    {
1354        if (js_p->error_code == 0)
1355        {
1356            assert(resp_attr->u.sym.target_path);
1357            assert(resp_attr->u.sym.target_path_len);
1358            /*
1359              adjust target path len down to actual size ; always
1360              include the null termination char in the target_path_len
1361            */
1362            resp_attr->u.sym.target_path_len =
1363                (strlen(resp_attr->u.sym.target_path) + 1);
1364
1365            gossip_debug(GOSSIP_GETATTR_DEBUG,
1366                         "  also returning link target of %s (len %d)\n",
1367                         resp_attr->u.sym.target_path,
1368                         resp_attr->u.sym.target_path_len);
1369        }
1370        else
1371        {
1372            gossip_err("Failed to retrieve symlink target path for "
1373                       "handle %llu,%d\n",llu(s_op->u.getattr.handle),
1374                       s_op->u.getattr.fs_id);
1375            PVFS_perror("Symlink retrieval failure",js_p->error_code);
1376
1377            free_nested_getattr_data(s_op);
1378            js_p->error_code = -PVFS_EINVAL;
1379            return SM_ACTION_COMPLETE;
1380        }
1381    }
1382    else if ((resp_attr->objtype == PVFS_TYPE_DIRECTORY) &&
1383            (resp_attr->mask & PVFS_ATTR_DIR_HINT))
1384    {
1385        gossip_debug(GOSSIP_GETATTR_DEBUG, " server returning "
1386            "dirent_count = %llu "
1387            "dfile_count = %d "
1388            "dist_name_len    = %d "
1389            "dist_params_len  = %d\n",
1390            llu(resp_attr->u.dir.dirent_count),
1391            resp_attr->u.dir.hint.dfile_count,
1392            resp_attr->u.dir.hint.dist_name_len,
1393            resp_attr->u.dir.hint.dist_params_len);
1394    }
1395
1396    gossip_debug(GOSSIP_GETATTR_DEBUG,"@ End %s attributes: sending "
1397                 "status %d (error = %d)\n",
1398                 PINT_util_get_object_type(resp_attr->objtype),
1399                 s_op->resp.status, js_p->error_code);
1400
1401#if 0
1402    gossip_debug(GOSSIP_GETATTR_DEBUG, "returning attrmask ");
1403    PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,
1404                        s_op->resp.u.getattr.attr.mask);
1405#endif
1406
1407    free_nested_getattr_data(s_op);
1408    return SM_ACTION_COMPLETE;
1409}
1410
1411static void free_nested_getattr_data(struct PINT_server_op *s_op)
1412{
1413   int i;
1414    /* free up anything that was set up specifically by this nested machine */
1415    if (s_op->free_val)
1416    {
1417       for (i=0; i<s_op->keyval_count; i++)
1418       {
1419           if (s_op->val_a[i].buffer)
1420               free(s_op->val_a[i].buffer);
1421       }
1422    }
1423    if(s_op->val_a)
1424    {
1425        free(s_op->val_a);
1426        s_op->val_a = NULL;
1427    }
1428    if(s_op->key_a)
1429    {
1430        free(s_op->key_a);
1431        s_op->key_a = NULL;
1432    }
1433    if(s_op->u.getattr.err_array)
1434    {
1435        free(s_op->u.getattr.err_array);
1436        s_op->u.getattr.err_array = NULL;
1437    }
1438    if (s_op->u.getattr.mirror_dfile_status_array)
1439    {
1440        free(s_op->u.getattr.mirror_dfile_status_array);
1441        s_op->u.getattr.mirror_dfile_status_array = NULL;
1442    }
1443    if(s_op->free_val)
1444    {
1445        free(s_op->val.buffer);
1446        s_op->val.buffer = NULL;
1447    }
1448    if(s_op->error_a)
1449    {
1450        free(s_op->error_a);
1451        s_op->error_a = NULL;
1452    }
1453
1454    return;
1455}
1456
1457static PINT_sm_action getattr_cleanup(
1458        struct PINT_smcb *smcb, job_status_s *js_p)
1459{
1460    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1461
1462    PINT_free_object_attr(&s_op->resp.u.getattr.attr);
1463    return(server_state_machine_complete(smcb));
1464}
1465
1466static PINT_sm_action getattr_with_prelude_init(
1467        struct PINT_smcb *smcb, job_status_s *js_p)
1468{
1469    js_p->error_code = 0;
1470    return SM_ACTION_COMPLETE;
1471}
1472
1473static PINT_sm_action getattr_setup_op(
1474        struct PINT_smcb *smcb, job_status_s *js_p)
1475{
1476    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1477    PVFS_handle_copy(s_op->u.getattr.handle, s_op->req->u.getattr.handle);
1478    s_op->u.getattr.fs_id = s_op->req->u.getattr.fs_id;
1479    s_op->u.getattr.attrmask = s_op->req->u.getattr.attrmask;
1480    s_op->u.getattr.err_array = NULL;
1481    s_op->u.getattr.mirror_dfile_status_array = NULL;
1482
1483    js_p->error_code = 0;
1484    return SM_ACTION_COMPLETE;
1485}
1486
1487static PINT_sm_action getattr_datafile_handles_safety_check(
1488        struct PINT_smcb *smcb, job_status_s *js_p)
1489{
1490    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1491
1492    if((js_p->error_code == 0) &&
1493        (s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES))
1494    {
1495        /* successfully read datafile key; make sure we got something valid */
1496        if(s_op->val.read_sz != s_op->val.buffer_sz)
1497        {
1498            gossip_err("Error: %s key found val size: %d when "
1499                       "expecting val size: %d\n",
1500                Trove_Common_Keys[METAFILE_HANDLES_KEY].key,
1501                s_op->val.read_sz,
1502                s_op->val.buffer_sz);
1503
1504            /* clear bitmask to prevent double free between setup_resp and
1505             * PINT_free_object_attr()
1506             */
1507            s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
1508            s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
1509            s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
1510
1511            js_p->error_code = -PVFS_EIO;
1512            return SM_ACTION_COMPLETE;
1513        }
1514    }
1515
1516    /* otherwise deliberately preserve existing error code */
1517    return SM_ACTION_COMPLETE;
1518}
1519
1520static PINT_sm_action getattr_get_dirdata_handle(
1521        struct PINT_smcb *smcb, job_status_s *js_p)
1522{
1523    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1524    int ret;
1525    job_id_t tmp_id;
1526
1527    s_op->key.buffer = Trove_Common_Keys[DIR_ENT_KEY].key;
1528    s_op->key.buffer_sz = Trove_Common_Keys[DIR_ENT_KEY].size;
1529    if(s_op->free_val)
1530    {
1531        free(s_op->val.buffer);
1532    }
1533    s_op->val.buffer = &s_op->u.getattr.dirent_handle;
1534    s_op->val.buffer_sz = sizeof(PVFS_handle);
1535    s_op->free_val = 0;
1536
1537    ret = job_trove_keyval_read(
1538        s_op->u.getattr.fs_id, s_op->u.getattr.handle,
1539        &s_op->key, &s_op->val,
1540        0,
1541        NULL,
1542        smcb,
1543        0,
1544        js_p,
1545        &tmp_id,
1546        server_job_context, s_op->req->hints);
1547
1548    return ret;
1549}
1550       
1551static PINT_sm_action getattr_get_dirent_count(
1552        struct PINT_smcb *smcb, job_status_s *js_p)
1553{
1554    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1555    int ret;
1556    job_id_t tmp_id;
1557
1558    if (!(s_op->u.getattr.attrmask & PVFS_ATTR_DIR_DIRENT_COUNT))
1559    {
1560         /* the caller didn't really want the dirent count; skip to get
1561          * directory hints
1562          */
1563         js_p->error_code = STATE_DIR_HINT;
1564         return SM_ACTION_COMPLETE;
1565    }
1566    ret = job_trove_keyval_get_handle_info(
1567        s_op->u.getattr.fs_id,
1568        s_op->u.getattr.dirent_handle,
1569        TROVE_KEYVAL_HANDLE_COUNT |
1570        0,
1571        &s_op->u.getattr.keyval_handle_info,
1572        smcb,
1573        0,
1574        js_p,
1575        &tmp_id,
1576        server_job_context, s_op->req->hints);
1577
1578    return ret;
1579}
1580
1581static PINT_sm_action getattr_interpret_dirent_count(
1582        struct PINT_smcb *smcb, job_status_s *js_p)
1583{
1584    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1585    switch(js_p->error_code)
1586    {
1587        case -TROVE_ENOENT:
1588            js_p->error_code = 0;
1589            s_op->resp.u.getattr.attr.u.dir.dirent_count = 0;
1590            break;
1591        case 0:
1592            s_op->resp.u.getattr.attr.u.dir.dirent_count =
1593                s_op->u.getattr.keyval_handle_info.count;
1594            break;
1595        default:
1596            return SM_ACTION_COMPLETE;
1597    }
1598
1599    gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr: dirent_count: %lld\n",
1600        lld(s_op->resp.u.getattr.attr.u.dir.dirent_count));
1601
1602    return SM_ACTION_COMPLETE;
1603}
1604
1605static PINT_sm_action getattr_get_dir_hint(
1606        struct PINT_smcb *smcb, job_status_s *js_p)
1607{
1608    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1609    int ret, i;
1610    job_id_t tmp_id;
1611
1612    /* NOTE: memory allocations are released in the getattr_cleanup()
1613     * function
1614     */
1615   
1616    if (!(s_op->u.getattr.attrmask & PVFS_ATTR_DIR_HINT))
1617    {
1618        /* the caller didn't really want the dir hints; skip
1619         */
1620        js_p->error_code = STATE_DONE;
1621        return SM_ACTION_COMPLETE;
1622    }
1623
1624
1625    gossip_debug(GOSSIP_SERVER_DEBUG, "  trying to getxattr of %s,%s,%s "
1626                 "of dir handle (coll_id = %d, handle = %llu\n",
1627                 Trove_Special_Keys[DIST_NAME_KEY].key,
1628                 Trove_Special_Keys[DIST_PARAMS_KEY].key,
1629                 Trove_Special_Keys[NUM_DFILES_KEY].key,
1630                 s_op->u.getattr.fs_id, llu(s_op->u.getattr.handle));
1631
1632    s_op->resp.u.getattr.attr.u.dir.hint.dist_params =
1633        (char *) calloc(1, PVFS_REQ_LIMIT_DIST_BYTES);
1634    if (!s_op->resp.u.getattr.attr.u.dir.hint.dist_params)
1635    {
1636        js_p->error_code = -PVFS_ENOMEM;
1637        return SM_ACTION_COMPLETE;
1638    }
1639    s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len =
1640        PVFS_REQ_LIMIT_DIST_BYTES;
1641
1642    s_op->resp.u.getattr.attr.u.dir.hint.dist_name =
1643        (char *) calloc(1, PVFS_REQ_LIMIT_DIST_NAME);
1644    if (!s_op->resp.u.getattr.attr.u.dir.hint.dist_name)
1645    {
1646        js_p->error_code = -PVFS_ENOMEM;
1647        return SM_ACTION_COMPLETE;
1648    }
1649    s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len   =
1650        PVFS_REQ_LIMIT_DIST_NAME;
1651
1652    s_op->key_a =
1653        (PVFS_ds_keyval *) calloc(NUM_SPECIAL_KEYS, sizeof(PVFS_ds_keyval));
1654    if (s_op->key_a == NULL)
1655    {
1656        js_p->error_code = -PVFS_ENOMEM;
1657        return SM_ACTION_COMPLETE;
1658    }
1659    s_op->val_a = (PVFS_ds_keyval *) calloc(NUM_SPECIAL_KEYS
1660                                           ,sizeof(PVFS_ds_keyval));
1661    if (s_op->val_a == NULL)
1662    {
1663        js_p->error_code = -PVFS_ENOMEM;
1664        return SM_ACTION_COMPLETE;
1665    }
1666    s_op->u.getattr.err_array = (PVFS_error*)calloc(NUM_SPECIAL_KEYS,
1667        sizeof(PVFS_error));
1668    if(s_op->u.getattr.err_array == NULL)
1669    {
1670        js_p->error_code = -PVFS_ENOMEM;
1671        return SM_ACTION_COMPLETE;
1672
1673    }
1674
1675    s_op->free_val = 0;
1676    s_op->keyval_count = NUM_SPECIAL_KEYS;
1677    for (i = 0; i < NUM_SPECIAL_KEYS; i++)
1678    {
1679        s_op->key_a[i].buffer = Trove_Special_Keys[i].key;
1680        s_op->key_a[i].buffer_sz = Trove_Special_Keys[i].size;
1681        if (i == NUM_DFILES_KEY)
1682        {
1683            s_op->val_a[i].buffer = (char *) calloc(1, 16);
1684            if(s_op->val_a[i].buffer == NULL)
1685            {
1686                js_p->error_code = -PVFS_ENOMEM;
1687                return SM_ACTION_COMPLETE;
1688            }
1689            s_op->val_a[i].buffer_sz = 16;
1690        }
1691        else if (i == DIST_PARAMS_KEY) {
1692            s_op->val_a[i].buffer
1693            = s_op->resp.u.getattr.attr.u.dir.hint.dist_params;
1694            s_op->val_a[i].buffer_sz
1695            = s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len;
1696        }
1697        else if (i == DIST_NAME_KEY) {
1698            s_op->val_a[i].buffer
1699            = s_op->resp.u.getattr.attr.u.dir.hint.dist_name;
1700            s_op->val_a[i].buffer_sz
1701            = s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len;
1702        }
1703    }
1704
1705    js_p->error_code = 0;
1706    ret = job_trove_keyval_read_list(
1707        s_op->u.getattr.fs_id,
1708        s_op->u.getattr.handle,
1709        s_op->key_a, s_op->val_a, s_op->u.getattr.err_array, NUM_SPECIAL_KEYS,
1710        0, NULL, smcb, 0, js_p, &tmp_id,
1711        server_job_context, s_op->req->hints);
1712
1713    return ret;
1714}
1715
1716static PINT_sm_action getattr_interpret_dir_hint(
1717        struct PINT_smcb *smcb, job_status_s *js_p)
1718{
1719    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1720    if(js_p->error_code != 0 && js_p->error_code != -TROVE_ENOENT)
1721    {
1722        /* if we failed to get any of the keys, and the error code is due to
1723         * something other than the keys simply not being present, then
1724         * propigate the error.
1725         */
1726        return SM_ACTION_COMPLETE;
1727    }
1728
1729    gossip_debug(GOSSIP_SERVER_DEBUG,
1730        "getattr: job status code = %d\n", js_p->error_code);
1731    if (s_op->val_a && s_op->key_a)
1732    {
1733        long int dfile_count = 0;
1734
1735        if (s_op->u.getattr.err_array[DIST_NAME_KEY] == 0)
1736        {
1737            gossip_debug(GOSSIP_SERVER_DEBUG,
1738                "val_a[DIST_NAME_KEY] %p read_sz = %d dist_name = %s\n",
1739                s_op->val_a[DIST_NAME_KEY].buffer,
1740                s_op->val_a[DIST_NAME_KEY].read_sz,
1741                (char *)s_op->val_a[DIST_NAME_KEY].buffer);
1742            s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len =
1743                s_op->val_a[DIST_NAME_KEY].read_sz;
1744        }
1745        else
1746        {
1747            s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len = 0;
1748        }           
1749        s_op->val_a[DIST_NAME_KEY].buffer = NULL;
1750
1751        if (s_op->u.getattr.err_array[DIST_PARAMS_KEY] == 0)
1752        {
1753            gossip_debug(GOSSIP_SERVER_DEBUG,
1754                "val_a[DIST_PARAMS_KEY] %p read_sz = %d dist_params = %s\n",
1755                s_op->val_a[DIST_PARAMS_KEY].buffer,
1756                s_op->val_a[DIST_PARAMS_KEY].read_sz,
1757                (char *)s_op->val_a[DIST_PARAMS_KEY].buffer);
1758            s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len =
1759                s_op->val_a[DIST_PARAMS_KEY].read_sz;
1760        }
1761        else
1762        {
1763            s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len = 0;
1764        }
1765        if (s_op->val_a[DIST_PARAMS_KEY].buffer)
1766        {
1767           s_op->val_a[DIST_PARAMS_KEY].buffer = NULL;
1768        }
1769
1770
1771
1772        if (s_op->u.getattr.err_array[NUM_DFILES_KEY] == 0)
1773        {
1774            char *endptr = NULL;
1775            gossip_debug(GOSSIP_SERVER_DEBUG, "val_a[NUM_DFILES_KEY] %p "
1776                                              "read_sz = %d\n",
1777                s_op->val_a[NUM_DFILES_KEY].buffer,
1778                s_op->val_a[NUM_DFILES_KEY].read_sz);
1779            dfile_count = strtol(s_op->val_a[NUM_DFILES_KEY].buffer
1780                                , &endptr, 10);
1781            if (*endptr != '\0' || dfile_count < 0)
1782            {
1783                dfile_count = 0;
1784            }
1785        }
1786        if(s_op->val_a[NUM_DFILES_KEY].buffer)
1787        {
1788            free(s_op->val_a[NUM_DFILES_KEY].buffer);
1789            s_op->val_a[NUM_DFILES_KEY].buffer = NULL;
1790            s_op->val_a[NUM_DFILES_KEY].buffer_sz = 0;
1791        }
1792        s_op->keyval_count = 0;
1793        s_op->free_val = 0;
1794
1795
1796        s_op->resp.u.getattr.attr.u.dir.hint.dfile_count = dfile_count;
1797
1798        gossip_debug(GOSSIP_SERVER_DEBUG, "getattr: dir hint dfile_count: %d\n",
1799            s_op->resp.u.getattr.attr.u.dir.hint.dfile_count);
1800
1801        js_p->error_code = 0;
1802    }/* end if val_a and key_a */
1803    return SM_ACTION_COMPLETE;
1804}
1805
1806/* getattr_detect_stuffed()
1807 *
1808 * determine if a file is stuffed or not
1809 */
1810static PINT_sm_action getattr_detect_stuffed(
1811        struct PINT_smcb *smcb, job_status_s *js_p)
1812{
1813    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
1814    job_id_t tmp_id;
1815
1816    /* we can determine stuffedness by the presence of the dfiles req key */
1817
1818    s_op->key.buffer = Trove_Common_Keys[NUM_DFILES_REQ_KEY].key;
1819    s_op->key.buffer_sz = Trove_Common_Keys[NUM_DFILES_REQ_KEY].size;
1820    if(s_op->free_val)
1821    {
1822        free(s_op->val.buffer);
1823    }
1824    s_op->val.buffer =  &s_op->u.getattr.num_dfiles_req;
1825    s_op->val.buffer_sz =  sizeof(s_op->u.getattr.num_dfiles_req);
1826    s_op->free_val = 0;
1827
1828    return(job_trove_keyval_read(
1829        s_op->u.getattr.fs_id,
1830        s_op->u.getattr.handle,
1831        &(s_op->key),
1832        &(s_op->val),
1833        0,
1834        NULL, smcb, 0, js_p,
1835        &tmp_id, server_job_context,
1836        s_op->req->hints));
1837}
1838
1839PINT_GET_OBJECT_REF_DEFINE(getattr);
1840
1841struct PINT_server_req_params pvfs2_get_attr_params =
1842{
1843    .string_name = "getattr",
1844    .perm = PINT_SERVER_CHECK_ATTR,
1845    .access_type = PINT_server_req_readonly,
1846    .sched_policy = PINT_SERVER_REQ_SCHEDULE,
1847    .get_object_ref = PINT_get_object_ref_getattr,
1848    .state_machine = &pvfs2_get_attr_sm
1849};
1850
1851/*
1852 * Local variables:
1853 *  mode: c
1854 *  c-indent-level: 4
1855 *  c-basic-offset: 4
1856 * End:
1857 *
1858 * vim: ft=c ts=8 sts=4 sw=4 expandtab
1859 */
1860
Note: See TracBrowser for help on using the browser.