root/branches/Orange-Elaine-Distr-Dir-Branch/src/server/lookup.sm @ 8492

Revision 8492, 23.9 KB (checked in by elaine, 3 years ago)

Next round of distributed directory changes.

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7#include <string.h>
8#include <stddef.h>
9#include <assert.h>
10#include <stdio.h>
11#include <stdlib.h>
12#ifdef HAVE_MALLOC_H
13#include <malloc.h>
14#endif
15
16#include "server-config.h"
17#include "pvfs2-server.h"
18#include "pvfs2-attr.h"
19#include "str-utils.h"
20#include "pint-util.h"
21#include "pvfs2-internal.h"
22#include "check.h"
23
24enum
25{
26    STATE_ENOTDIR = 22,
27    STATE_NOMORESEGS = 23,
28    LOOKUP_CHECK_DIR_ACLS = 24,
29};
30
31%%
32
33machine pvfs2_lookup_sm
34{
35    state prelude
36    {
37        jump pvfs2_prelude_sm;
38        success => init;
39        default => final_response;
40    }
41
42    state init
43    {
44        run lookup_init;
45        STATE_ENOTDIR => setup_resp;
46        default => read_object_metadata;
47    }
48
49    state read_object_metadata
50    {
51        run lookup_read_object_metadata;
52        success => verify_object_metadata;
53        default => setup_resp;
54    }
55
56    state verify_object_metadata
57    {
58        run lookup_verify_object_metadata;
59        LOOKUP_CHECK_DIR_ACLS => read_directory_acls;
60        success => read_num_directory_entry_handles;
61        default => setup_resp;
62    }
63
64    state read_directory_acls
65    {
66        run lookup_check_acls_if_needed;
67        default => check_acls;
68    }
69
70    state check_acls
71    {
72        run lookup_check_acls;
73        success => read_num_directory_entry_handles;
74        default => setup_resp;
75    }
76
77    state read_num_directory_entry_handles
78    {
79        run lookup_read_num_directory_entry_handles;
80        default => read_directory_entry_handles;
81    }
82
83    state read_directory_entry_handles
84    {
85        run lookup_read_directory_entry_handles;
86        success => read_directory_entry;
87        default => setup_resp;
88    }
89
90    state read_directory_entry
91    {
92        run lookup_read_directory_entry;
93        success => read_object_metadata;
94        default => setup_resp;
95    }
96   
97    state setup_resp
98    {
99        run lookup_setup_resp;
100        default => final_response;
101    }
102
103    state final_response
104    {
105        jump pvfs2_final_response_sm;
106        default => cleanup;
107    }
108
109    state cleanup
110    {
111        run lookup_cleanup;
112        default => terminate;
113    }
114}
115
116%%
117
118/*
119 * Function: lookup_init
120 *
121 * Synopsis: initializes internal structures and posts job to request
122 * scheduler.
123 *
124 * Assumes req structure holds a valid path.
125 *
126 * Initializes segp, seg_ct, seg_nr fields in s_op->u.lookup.
127 *
128 * Allocates memory for handle and attribute arrays that will be
129 * returned in the response.
130 *
131 * Note: memory is allocated as one big chunk, pointed to by
132 * s_op->resp.u.lookup_path.handle_array.
133 *
134 */
135static PINT_sm_action lookup_init(
136        struct PINT_smcb *smcb, job_status_s *js_p)
137{
138    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
139    char *ptr = NULL;
140
141
142    /* fill in the lookup portion of the PINT_server_op */
143    s_op->u.lookup.segp = NULL;
144    s_op->u.lookup.seg_nr = 0;
145    s_op->u.lookup.seg_ct = PINT_string_count_segments(
146        s_op->req->u.lookup_path.path);
147    s_op->u.lookup.handle_ct = 0;
148    s_op->u.lookup.attr_ct = 0;
149
150    gossip_debug(GOSSIP_SERVER_DEBUG, " STARTING LOOKUP REQUEST "
151                     "(path:%s)(fs_id:%d)(handle:%llu)(attrmask:%u)"
152                     "(# of segments:%u)\n",
153                     s_op->req->u.lookup_path.path,
154                     s_op->req->u.lookup_path.fs_id,
155                     llu(s_op->req->u.lookup_path.handle),
156                     s_op->req->u.lookup_path.attrmask,
157                     s_op->u.lookup.seg_ct);
158    if ((s_op->u.lookup.seg_ct < 0) ||
159        (s_op->u.lookup.seg_ct > PVFS_REQ_LIMIT_PATH_SEGMENT_COUNT))
160    {
161        gossip_err("  invalid path %s (bad segment count); "
162                   "sending error response\n",
163                   s_op->req->u.lookup_path.path);
164        js_p->error_code = -PVFS_ENOTDIR;
165        return SM_ACTION_DEFERRED;
166    }
167
168    /* allocate the internal ds_attr_array */
169    s_op->u.lookup.ds_attr_array = (PVFS_ds_attributes *)
170        malloc(s_op->u.lookup.seg_ct * sizeof(PVFS_ds_attributes));
171    if(!s_op->u.lookup.ds_attr_array)
172    {
173        js_p->error_code = -PVFS_ENOMEM;
174        return 1;
175    }
176
177    /* allocate memory
178     *
179     * Note: all memory is allocated in a single block,
180     * pointed to by s_op->resp.u.lookup_path.handle_array
181     */
182    ptr = malloc(s_op->u.lookup.seg_ct *
183                 (sizeof(PVFS_handle) + sizeof(PVFS_object_attr)));
184    if (!ptr)
185    {
186        js_p->error_code = -PVFS_ENOMEM;
187        return SM_ACTION_COMPLETE;
188    }
189
190    s_op->resp.u.lookup_path.handle_array = (PVFS_handle *)ptr;
191    ptr += (s_op->u.lookup.seg_ct * sizeof(PVFS_handle));
192
193    s_op->resp.u.lookup_path.attr_array = (PVFS_object_attr *)ptr;
194
195    js_p->error_code = 0;
196    return SM_ACTION_COMPLETE;
197}
198
199/*
200 * Function: lookup_read_object_metadata
201 *
202 * Synopsis: Given an object handle, looks up the attributes
203 * (metadata) for that handle.
204 *
205 * Initializes key and value structures to direct metadata:
206 * - if this is the starting (base) handle, store in
207 *   s_op->u.lookup.base_attr
208 * - otherwise store it in the appropriate slot in the resp handle array
209 *
210 * Posts the keyval read to trove.
211 */
212static PINT_sm_action lookup_read_object_metadata(
213        struct PINT_smcb *smcb, job_status_s *js_p)
214{
215    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
216    int ret = -PVFS_EINVAL;
217    job_id_t j_id;
218    PVFS_handle handle = PVFS_HANDLE_NULL;
219    PVFS_ds_attributes *ds_attr = NULL;
220
221    assert(s_op->u.lookup.seg_nr <= s_op->u.lookup.seg_ct);
222
223    /* use the base handle if we haven't looked up a segment yet */
224    if (s_op->u.lookup.seg_nr == 0)
225    {
226        handle = s_op->req->u.lookup_path.handle;
227        ds_attr = &(s_op->u.lookup.ds_attr_array[0]);
228    }
229    else
230    {
231        handle = s_op->resp.u.lookup_path.handle_array[
232            s_op->u.lookup.seg_nr-1];
233        ds_attr = &(s_op->u.lookup.ds_attr_array[
234                        s_op->u.lookup.seg_nr - 1]);
235    }
236
237    /* update our successful handle read count */
238    s_op->u.lookup.handle_ct++;
239
240    /* Copy the fsid and handle to the s_op structure for the acl check */
241    s_op->target_handle = handle;
242    s_op->target_fs_id = s_op->req->u.lookup_path.fs_id;
243
244    /* get the dspace attributes/metadata */
245    ret = job_trove_dspace_getattr(
246        s_op->req->u.lookup_path.fs_id, handle, smcb, ds_attr,
247        0, js_p, &j_id, server_job_context, s_op->req->hints );
248
249    return ret;
250}
251
252/*
253 * Function: lookup_verify_object_metadata
254 *
255 * Synopsis: Examine the metadata returned from the prelude sm.  If
256 * the metadata is for a directory, prepare to read the handle of the
257 * next segment, if there is one.  If the metadata is for a file,
258 * prepare to send a response.
259 *
260 * If the object is a directory, this function sets the
261 * s_op->u.lookup.segp value to point to the next segment to look up;
262 * this is used in lookup_read_directory_entry.
263 *
264 * This function does not post an operation, but rather returns 1
265 * immediately.
266 */
267static PINT_sm_action lookup_verify_object_metadata(
268        struct PINT_smcb *smcb, job_status_s *js_p)
269{
270    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
271    int ret = -PVFS_EINVAL;
272    PVFS_object_attr *a_p = NULL;
273    PVFS_ds_attributes *ds_attr = NULL;
274
275    if (s_op->u.lookup.seg_nr == 0)
276    {
277        a_p = &s_op->attr;
278        ds_attr = &(s_op->u.lookup.ds_attr_array[0]);
279    }
280    else
281    {
282        a_p = &s_op->resp.u.lookup_path.attr_array[
283            s_op->u.lookup.seg_nr - 1];
284        ds_attr = &(s_op->u.lookup.ds_attr_array[
285                        s_op->u.lookup.seg_nr - 1]);
286    }
287
288    PVFS_ds_attr_to_object_attr(ds_attr, a_p);
289    a_p->mask = PVFS_ATTR_COMMON_ALL;
290    s_op->target_object_attr = a_p;
291
292    /* update our successful attr read count */
293    s_op->u.lookup.attr_ct++;
294
295    assert(((a_p->objtype == PVFS_TYPE_DIRECTORY) ||
296            (a_p->objtype == PVFS_TYPE_METAFILE)  ||
297            (a_p->objtype == PVFS_TYPE_SYMLINK)));
298
299    gossip_debug(
300        GOSSIP_SERVER_DEBUG, "  attrs = (owner = %d, group = %d, "
301        "perms = %o, type = %d)\n", a_p->owner, a_p->group,
302        a_p->perms, a_p->objtype);
303
304    /* if we hit a metafile, we are done */
305    if (a_p->objtype == PVFS_TYPE_METAFILE)
306    {
307        gossip_debug(GOSSIP_SERVER_DEBUG, "  object is a metafile; "
308                     "halting lookup and sending response\n");
309
310        js_p->error_code = STATE_ENOTDIR;
311        return SM_ACTION_COMPLETE;
312    }
313
314    /*
315      if we hit a symlink, we're done; client will pick up the pieces
316      and continue to resolve the symlink if required
317    */
318    if (a_p->objtype == PVFS_TYPE_SYMLINK)
319    {
320        gossip_debug(GOSSIP_SERVER_DEBUG, "  object is a symlink; "
321                     "halting lookup and sending response\n");
322
323        js_p->error_code = STATE_ENOTDIR;
324        return SM_ACTION_COMPLETE;
325    }
326
327    /* if we looked up all the segments, we are done */
328    if (s_op->u.lookup.seg_nr == s_op->u.lookup.seg_ct)
329    {
330        gossip_debug(GOSSIP_SERVER_DEBUG, "  no more segments in path; "
331                     "sending response\n");
332
333        js_p->error_code = STATE_NOMORESEGS;
334        return SM_ACTION_COMPLETE;
335    }
336
337    /* if we reach this point, the object is a directory.  Verify that we
338     * have execute permission on the directory before continuing traversal
339     */
340    js_p->error_code = PINT_check_mode(a_p, s_op->req->credentials.uid,
341        s_op->req->credentials.gid, PINT_ACCESS_EXECUTABLE);
342    if(js_p->error_code != 0)
343    {
344        /* doesn't look like we have permission to traverse directory; bail
345         * out
346         */
347        js_p->error_code = LOOKUP_CHECK_DIR_ACLS;
348        return SM_ACTION_COMPLETE;
349    }
350
351    /* find the segment that we should look up in the directory */
352    ret = PINT_string_next_segment(
353        s_op->req->u.lookup_path.path, &s_op->u.lookup.segp,
354        &s_op->u.lookup.segstate);
355
356    if(ret != 0)
357    {
358        gossip_err("PINT_string_next_segment failed: path: %s\n",
359                   s_op->req->u.lookup_path.path);
360    }
361    assert(ret == 0);
362
363    gossip_debug(GOSSIP_SERVER_DEBUG, "  object is a directory; will be "
364                 "looking for handle for segment \"%s\" in a bit\n",
365                 s_op->u.lookup.segp);
366
367
368    return SM_ACTION_COMPLETE;
369}
370
371/*
372 * Post a keyval DB read of the posix acls to check and see if
373 * directory traversal is allowed or not
374 */
375static PINT_sm_action lookup_check_acls_if_needed(
376    struct PINT_smcb *smcb, job_status_s* js_p)
377{
378    int ret = -PVFS_EINVAL;
379    job_id_t i;
380    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
381
382    /* If we get here with an invalid fsid and handle, we have to
383     * return -PVFS_EACCESS
384     */
385    if (s_op->target_fs_id == PVFS_FS_ID_NULL
386        || s_op->target_handle == PVFS_HANDLE_NULL)
387    {
388        js_p->error_code = -PVFS_EACCES;
389        return SM_ACTION_COMPLETE;
390    }
391    js_p->error_code = 0;
392
393    memset(&s_op->key, 0, sizeof(PVFS_ds_keyval));
394    memset(&s_op->val, 0, sizeof(PVFS_ds_keyval));
395    s_op->key.buffer = "system.posix_acl_access";
396    s_op->key.buffer_sz = strlen(s_op->key.buffer) + 1;
397    s_op->val.buffer = (char *) malloc(PVFS_REQ_LIMIT_VAL_LEN);
398    if (!s_op->val.buffer)
399    {
400        js_p->error_code = -PVFS_ENOMEM;
401        return SM_ACTION_COMPLETE;
402    }
403    s_op->val.buffer_sz = PVFS_REQ_LIMIT_VAL_LEN;
404
405    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "About to retrieve acl keyvals "
406                 "for handle %llu\n", llu(s_op->target_handle));
407
408    /* Read acl keys */
409    ret = job_trove_keyval_read(
410        s_op->target_fs_id,
411        s_op->target_handle,
412        &s_op->key,
413        &s_op->val,
414        0,
415        NULL,
416        s_op,
417        0,
418        js_p,
419        &i,
420        server_job_context, s_op->req->hints);
421    return ret;
422}
423
424/*
425 * Verify if the completed keyval DB operation allows the lookup
426 * to proceed or not. i.e. executable privileges on directory
427 * for the requesting user or not.
428 */
429static PINT_sm_action lookup_check_acls(
430    struct PINT_smcb *smcb, job_status_s* js_p)
431{
432    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
433    PVFS_object_attr *obj_attr = NULL;
434    int want = PVFS2_ACL_EXECUTE;
435
436    /* The dspace attr must have been read at this point */
437    obj_attr = s_op->target_object_attr;
438    assert(obj_attr);
439
440    /* anything non-zero we treat as a real error */
441    if (js_p->error_code)
442    {
443        goto cleanup;
444    }
445    /* ok; let the actual acl check be done */
446    js_p->error_code = PINT_check_acls(s_op->val.buffer,
447                        s_op->val.read_sz,
448                        obj_attr,
449                        s_op->req->credentials.uid,
450                        s_op->req->credentials.gid,
451                        want);
452    /* if we are good to go,
453       find the segment that we should look up in the directory */
454    if (js_p->error_code == 0)
455    {
456        js_p->error_code = PINT_string_next_segment(
457            s_op->req->u.lookup_path.path, &s_op->u.lookup.segp,
458            &s_op->u.lookup.segstate);
459
460        if(js_p->error_code != 0)
461        {
462            gossip_err("PINT_string_next_segment failed to get the"
463                       "next segment to lookup from the path: %s\n",
464                       s_op->req->u.lookup_path.path);
465        }
466        else
467        {
468            gossip_debug(GOSSIP_SERVER_DEBUG, "  after ACL check "
469                         "object is a directory; will be "
470                         "looking for handle for segment \"%s\" in a bit\n",
471                         s_op->u.lookup.segp);
472        }
473    }
474cleanup:
475    if (s_op->val.buffer)
476        free(s_op->val.buffer);
477    memset(&s_op->key, 0, sizeof(PVFS_ds_keyval));
478    memset(&s_op->val, 0, sizeof(PVFS_ds_keyval));
479    return SM_ACTION_COMPLETE;
480}
481
482/*
483 * Function: lookup_read_num_directory_entry_handles
484 *
485 * Synopsis: Given a directory handle, determine how many handles are
486 * used to store directory entries for this directory.
487 *
488 * Initializes key and value structures to direct num handles into
489 * s_op->u.lookup.num_dirent_handles. The handle to use for the
490 * read is either:
491 * - the starting handle from the req (if we haven't looked up a
492 *   segment yet), or
493 * - the previous segment's handle (from response handle array).
494 *
495 * Posts the keyval read to trove.
496 */
497static PINT_sm_action lookup_read_num_directory_entry_handles(
498        struct PINT_smcb *smcb, job_status_s *js_p)
499{
500    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
501    int ret = -PVFS_EINVAL;
502    PVFS_handle handle = PVFS_HANDLE_NULL;
503    job_id_t j_id;
504
505    /* use the base handle if we haven't looked up a segment yet */
506    if (s_op->u.lookup.seg_nr == 0)
507    {
508        handle = s_op->req->u.lookup_path.handle;
509    }
510    else
511    {
512        handle = s_op->resp.u.lookup_path.handle_array[
513            s_op->u.lookup.seg_nr-1];
514    }
515
516    gossip_debug(GOSSIP_SERVER_DEBUG,
517                 "  reading distributed directory attributes from handle %llu\n",
518                 llu(handle));
519
520    s_op->key.buffer = Trove_Common_Keys[DIST_DIR_ATTR_KEY].key;
521    s_op->key.buffer_sz = Trove_Common_Keys[DIST_DIR_ATTR_KEY].size;
522    s_op->val.buffer = &s_op->u.lookup.attr.u.dir.dist_dir_attr;
523    s_op->val.buffer_sz = sizeof(s_op->u.lookup.attr.u.dir.dist_dir_attr);
524
525    ret = job_trove_keyval_read(
526        s_op->req->u.lookup_path.fs_id, handle, &s_op->key, &s_op->val,
527        0,
528        NULL, smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
529
530    return ret;
531}
532
533/*
534 * Function: lookup_read_directory_entry_handles
535 *
536 * Synopsis: Given a directory handle, look up the handle used to
537 * store directory entries for this directory.
538 *
539 * Initializes key and value structures to direct handle into
540 * s_op->u.lookup.dirent_handle, which is where we always store the
541 * handle used to read directory entries.  The handle to use for the
542 * read is either:
543 * - the starting handle from the req (if we haven't looked up a
544 *   segment yet), or
545 * - the previous segment's handle (from response handle array).
546 *
547 * Posts the keyval read to trove.
548 */
549static PINT_sm_action lookup_read_directory_entry_handles(
550        struct PINT_smcb *smcb, job_status_s *js_p)
551{
552    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
553    int ret = -PVFS_EINVAL;
554    PVFS_handle handle = PVFS_HANDLE_NULL;
555    job_id_t j_id;
556    PVFS_object_attr *attr;
557    int keyval_count;
558
559    attr = &s_op->u.lookup.attr;
560
561    if(js_p->error_code == -TROVE_ENOENT)
562    {
563        gossip_debug(GOSSIP_GETATTR_DEBUG, "lookup: no DIST_DIR_ATTR key present!\n");
564        attr->u.dir.dist_dir_bitmap = NULL;
565        attr->u.dir.dirdata_handles = NULL;
566        return SM_ACTION_COMPLETE;
567    }
568
569    assert(attr->u.dir.dist_dir_attr.num_servers > 0 &&
570        attr->u.dir.dist_dir_attr.bitmap_size > 0);
571
572    /* use the base handle if we haven't looked up a segment yet */
573    if (s_op->u.lookup.seg_nr == 0)
574    {
575        handle = s_op->req->u.lookup_path.handle;
576    }
577    else
578    {
579        handle = s_op->resp.u.lookup_path.handle_array[
580            s_op->u.lookup.seg_nr-1];
581    }
582
583    gossip_debug(GOSSIP_SERVER_DEBUG,
584            "lookup: get dist-dir-attr for dir meta handle %llu "
585            "with tree_height=%d, num_servers=%d, bitmap_size=%d, "
586            "split_size=%d, server_no=%d and branch_level=%d\n",
587            llu(handle),
588            attr->u.dir.dist_dir_attr.tree_height,
589            attr->u.dir.dist_dir_attr.num_servers,
590            attr->u.dir.dist_dir_attr.bitmap_size,
591            attr->u.dir.dist_dir_attr.split_size,
592            attr->u.dir.dist_dir_attr.server_no,
593            attr->u.dir.dist_dir_attr.branch_level);
594
595    /* allocate space for bitmap and dirdata handles */
596    attr->u.dir.dist_dir_bitmap =
597        malloc(attr->u.dir.dist_dir_attr.bitmap_size *
598                sizeof(PVFS_dist_dir_bitmap_basetype));
599    attr->u.dir.dirdata_handles =
600        malloc(attr->u.dir.dist_dir_attr.num_servers *
601                sizeof(PVFS_handle));
602    if(!attr->u.dir.dist_dir_bitmap ||
603            !attr->u.dir.dirdata_handles)
604    {
605        js_p->error_code = -PVFS_ENOMEM;
606        return SM_ACTION_COMPLETE;
607    }
608
609
610    /* total 2 keyvals, DIST_DIRDATA_BITMAP, DIST_DIRDATA_HANDLES */
611    keyval_count = 2;
612
613    s_op->key_a = malloc(sizeof(PVFS_ds_keyval) * keyval_count);
614    if(!s_op->key_a)
615    {
616        js_p->error_code = -PVFS_ENOMEM;
617        return SM_ACTION_COMPLETE;
618    }
619
620    s_op->val_a = malloc(sizeof(PVFS_ds_keyval) * keyval_count);
621    if(!s_op->val_a)
622    {
623        free(s_op->key_a);
624        js_p->error_code = -PVFS_ENOMEM;
625        return SM_ACTION_COMPLETE;
626    }
627    memset(s_op->val_a, 0, sizeof(PVFS_ds_keyval) * keyval_count);
628
629    s_op->key_a[0].buffer = Trove_Common_Keys[DIST_DIRDATA_BITMAP_KEY].key;
630    s_op->key_a[0].buffer_sz = Trove_Common_Keys[DIST_DIRDATA_BITMAP_KEY].size;
631
632    s_op->val_a[0].buffer_sz =
633        attr->u.dir.dist_dir_attr.bitmap_size *
634        sizeof(PVFS_dist_dir_bitmap_basetype);
635    s_op->val_a[0].buffer = attr->u.dir.dist_dir_bitmap;
636
637    s_op->key_a[1].buffer = Trove_Common_Keys[DIST_DIRDATA_HANDLES_KEY].key;
638    s_op->key_a[1].buffer_sz = Trove_Common_Keys[DIST_DIRDATA_HANDLES_KEY].size;
639
640    s_op->val_a[1].buffer = attr->u.dir.dirdata_handles;
641    s_op->val_a[1].buffer_sz = attr->u.dir.dist_dir_attr.num_servers *
642        sizeof(PVFS_handle);
643
644    js_p->error_code = 0;
645    ret = job_trove_keyval_read_list(
646        s_op->u.getattr.fs_id, s_op->u.getattr.handle,
647        s_op->key_a, s_op->val_a,
648        s_op->u.getattr.err_array,
649        keyval_count,
650        0,
651        NULL,
652        smcb,
653        0,
654        js_p,
655        &j_id,
656        server_job_context, s_op->req->hints);
657
658    return ret;
659}
660
661/*
662 * Function: lookup_read_directory_entry
663 *
664 * Synopsis: Given a handle for a dspace holding directory entries,
665 * look up the current segment and obtain its handle.
666 */
667static PINT_sm_action lookup_read_directory_entry(
668        struct PINT_smcb *smcb, job_status_s *js_p)
669{
670    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
671    int ret = -PVFS_EINVAL;
672    job_id_t j_id;
673
674    /* TODO: Need to see if directory entry is on this server. If not we
675       can't continue processing the current segment. */
676    gossip_debug(
677        GOSSIP_SERVER_DEBUG, "  need to read from dirent handle = "
678        "%llu, segment = %s (len=%d)\n", llu(s_op->u.lookup.attr.u.dir.dirdata_handles[0]),
679        s_op->u.lookup.segp, (int) strlen(s_op->u.lookup.segp));
680
681    s_op->key.buffer = s_op->u.lookup.segp;
682    s_op->key.buffer_sz = strlen(s_op->u.lookup.segp) + 1;
683    s_op->val.buffer =
684        &s_op->resp.u.lookup_path.handle_array[s_op->u.lookup.seg_nr];
685    s_op->val.buffer_sz = sizeof(PVFS_handle);
686
687    /*
688      NOTE: if this operation fails, seg_nr will indicate one too many
689      valid segments; this is addressed in lookup_send_response.
690    */
691    s_op->u.lookup.seg_nr++;
692
693    ret = job_trove_keyval_read(
694        s_op->req->u.lookup_path.fs_id, s_op->u.lookup.attr.u.dir.dirdata_handles[0],
695        &s_op->key, &s_op->val,
696        0,
697        NULL, smcb, 0, js_p, &j_id,
698        server_job_context, s_op->req->hints);
699
700    return ret;
701}
702
703
704static PINT_sm_action lookup_setup_resp(
705        struct PINT_smcb *smcb, job_status_s *js_p)
706{
707    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
708
709    /*
710      NOTE: we may have handle_count N with attr_count N-1 in the case
711      that another meta-server needs to continue with the path (attr)
712      lookup.  otherwise, we're returning N handles with N attrs.
713
714      # actually completed are one less than the respective counts
715    */
716    s_op->resp.u.lookup_path.handle_count = s_op->u.lookup.handle_ct - 1;
717    s_op->resp.u.lookup_path.attr_count = s_op->u.lookup.attr_ct - 1;
718
719    if (s_op->resp.u.lookup_path.handle_count ||
720        s_op->resp.u.lookup_path.attr_count)
721    {
722        js_p->error_code = 0;
723    }
724    else if(js_p->error_code < 0)
725    {
726        /* preserve error code in this case and fall through */
727        gossip_debug(GOSSIP_SERVER_DEBUG, "  lookup error in previous step\n");
728    }
729    else
730    {
731        js_p->error_code = -PVFS_ENOENT;
732    }
733
734    gossip_debug(GOSSIP_SERVER_DEBUG, "  sending '%s' response with %d "
735                 "handle(s) and %d attr(s)\n",
736                 (js_p->error_code ? "error" : "success"),
737                 s_op->resp.u.lookup_path.handle_count,
738                 s_op->resp.u.lookup_path.attr_count);
739
740    if(js_p->error_code == 0)
741    {
742        PINT_ACCESS_DEBUG(
743            s_op, GOSSIP_ACCESS_DEBUG, "path: %s, handle: %llu\n",
744            s_op->req->u.lookup_path.path,
745            llu(s_op->resp.u.lookup_path.handle_array[
746                s_op->resp.u.lookup_path.handle_count-1]));
747    }
748    else
749    {
750        PINT_ACCESS_DEBUG(
751            s_op, GOSSIP_ACCESS_DEBUG, "path: %s, lookup failed\n",
752            s_op->req->u.lookup_path.path);
753    }
754
755    return SM_ACTION_COMPLETE;
756}
757
758/*
759 * Function: lookup_cleanup
760 *
761 * Synopsis: Free memory allocated during request processing.
762 *
763 * There are a bunch of regions that must be freed after processing
764 * completes:
765 * - decoded request (s_op->decoded)
766 * - encoded request (s_op->unexp_bmi_buff.buffer)
767 * - encoded response (s_op->encoded)
768 * - original (decoded) response (s_op->resp)
769 * - dynamically allocated space (in this case
770 *   s_op->resp.u.lookup_path.handle_array)
771 * - the server operation structure itself
772 */
773static PINT_sm_action lookup_cleanup(
774        struct PINT_smcb *smcb, job_status_s *js_p)
775{
776    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
777    if (s_op->resp.u.lookup_path.handle_array)
778    {
779        free(s_op->resp.u.lookup_path.handle_array);
780        s_op->resp.u.lookup_path.handle_array = NULL;
781    }
782
783    if (s_op->u.lookup.ds_attr_array)
784    {
785        free(s_op->u.lookup.ds_attr_array);
786        s_op->u.lookup.ds_attr_array = NULL;
787    }
788
789    /* TODO: Need to free bitmap and handles arrays. */
790
791    return(server_state_machine_complete(smcb));
792}
793
794PINT_GET_OBJECT_REF_DEFINE(lookup_path);
795
796struct PINT_server_req_params pvfs2_lookup_params =
797{
798    .string_name = "lookup_path",
799    .perm = PINT_SERVER_CHECK_NONE,
800    .sched_policy = PINT_SERVER_REQ_SCHEDULE,
801    .get_object_ref = PINT_get_object_ref_lookup_path,
802    .state_machine = &pvfs2_lookup_sm
803};
804
805/*
806 * Local variables:
807 *  mode: c
808 *  c-indent-level: 4
809 *  c-basic-offset: 4
810 * End:
811 *
812 * vim: ft=c ts=8 sts=4 sw=4 expandtab
813 */
Note: See TracBrowser for help on using the browser.