root/branches/Orange-Elaine-Distr-Dir-Branch/src/server/lookup.sm @ 8495

Revision 8495, 24.4 KB (checked in by elaine, 3 years ago)

Partial fix to lookup problems.

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7#include <string.h>
8#include <stddef.h>
9#include <assert.h>
10#include <stdio.h>
11#include <stdlib.h>
12#ifdef HAVE_MALLOC_H
13#include <malloc.h>
14#endif
15
16#include "server-config.h"
17#include "pvfs2-server.h"
18#include "pvfs2-attr.h"
19#include "str-utils.h"
20#include "pint-util.h"
21#include "pvfs2-internal.h"
22#include "check.h"
23
24enum
25{
26    STATE_ENOTDIR = 22,
27    STATE_NOMORESEGS = 23,
28    LOOKUP_CHECK_DIR_ACLS = 24,
29};
30
31%%
32
33machine pvfs2_lookup_sm
34{
35    state prelude
36    {
37        jump pvfs2_prelude_sm;
38        success => init;
39        default => final_response;
40    }
41
42    state init
43    {
44        run lookup_init;
45        STATE_ENOTDIR => setup_resp;
46        default => read_object_metadata;
47    }
48
49    state read_object_metadata
50    {
51        run lookup_read_object_metadata;
52        success => verify_object_metadata;
53        default => setup_resp;
54    }
55
56    state verify_object_metadata
57    {
58        run lookup_verify_object_metadata;
59        LOOKUP_CHECK_DIR_ACLS => read_directory_acls;
60        success => read_num_directory_entry_handles;
61        default => setup_resp;
62    }
63
64    state read_directory_acls
65    {
66        run lookup_check_acls_if_needed;
67        default => check_acls;
68    }
69
70    state check_acls
71    {
72        run lookup_check_acls;
73        success => read_num_directory_entry_handles;
74        default => setup_resp;
75    }
76
77    state read_num_directory_entry_handles
78    {
79        run lookup_read_num_directory_entry_handles;
80        default => read_directory_entry_handles;
81    }
82
83    state read_directory_entry_handles
84    {
85        run lookup_read_directory_entry_handles;
86        success => read_directory_entry;
87        default => setup_resp;
88    }
89
90    state read_directory_entry
91    {
92        run lookup_read_directory_entry;
93        success => read_object_metadata;
94        default => setup_resp;
95    }
96   
97    state setup_resp
98    {
99        run lookup_setup_resp;
100        default => final_response;
101    }
102
103    state final_response
104    {
105        jump pvfs2_final_response_sm;
106        default => cleanup;
107    }
108
109    state cleanup
110    {
111        run lookup_cleanup;
112        default => terminate;
113    }
114}
115
116%%
117
118/*
119 * Function: lookup_init
120 *
121 * Synopsis: initializes internal structures and posts job to request
122 * scheduler.
123 *
124 * Assumes req structure holds a valid path.
125 *
126 * Initializes segp, seg_ct, seg_nr fields in s_op->u.lookup.
127 *
128 * Allocates memory for handle and attribute arrays that will be
129 * returned in the response.
130 *
131 * Note: memory is allocated as one big chunk, pointed to by
132 * s_op->resp.u.lookup_path.handle_array.
133 *
134 */
135static PINT_sm_action lookup_init(
136        struct PINT_smcb *smcb, job_status_s *js_p)
137{
138    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
139    char *ptr = NULL;
140
141
142    /* fill in the lookup portion of the PINT_server_op */
143    s_op->u.lookup.segp = NULL;
144    s_op->u.lookup.seg_nr = 0;
145    s_op->u.lookup.seg_ct = PINT_string_count_segments(
146        s_op->req->u.lookup_path.path);
147    s_op->u.lookup.handle_ct = 0;
148    s_op->u.lookup.attr_ct = 0;
149
150    gossip_debug(GOSSIP_SERVER_DEBUG, " STARTING LOOKUP REQUEST "
151                     "(path:%s)(fs_id:%d)(handle:%llu)(attrmask:%u)"
152                     "(# of segments:%u)\n",
153                     s_op->req->u.lookup_path.path,
154                     s_op->req->u.lookup_path.fs_id,
155                     llu(s_op->req->u.lookup_path.handle),
156                     s_op->req->u.lookup_path.attrmask,
157                     s_op->u.lookup.seg_ct);
158    if ((s_op->u.lookup.seg_ct < 0) ||
159        (s_op->u.lookup.seg_ct > PVFS_REQ_LIMIT_PATH_SEGMENT_COUNT))
160    {
161        gossip_err("  invalid path %s (bad segment count); "
162                   "sending error response\n",
163                   s_op->req->u.lookup_path.path);
164        js_p->error_code = -PVFS_ENOTDIR;
165        return SM_ACTION_DEFERRED;
166    }
167
168    /* allocate the internal ds_attr_array */
169    s_op->u.lookup.ds_attr_array = (PVFS_ds_attributes *)
170        malloc(s_op->u.lookup.seg_ct * sizeof(PVFS_ds_attributes));
171    if(!s_op->u.lookup.ds_attr_array)
172    {
173        js_p->error_code = -PVFS_ENOMEM;
174        return 1;
175    }
176
177    /* allocate memory
178     *
179     * Note: all memory is allocated in a single block,
180     * pointed to by s_op->resp.u.lookup_path.handle_array
181     */
182    ptr = malloc(s_op->u.lookup.seg_ct *
183                 (sizeof(PVFS_handle) + sizeof(PVFS_object_attr)));
184    if (!ptr)
185    {
186        js_p->error_code = -PVFS_ENOMEM;
187        return SM_ACTION_COMPLETE;
188    }
189
190    s_op->resp.u.lookup_path.handle_array = (PVFS_handle *)ptr;
191    ptr += (s_op->u.lookup.seg_ct * sizeof(PVFS_handle));
192
193    s_op->resp.u.lookup_path.attr_array = (PVFS_object_attr *)ptr;
194
195    js_p->error_code = 0;
196    return SM_ACTION_COMPLETE;
197}
198
199/*
200 * Function: lookup_read_object_metadata
201 *
202 * Synopsis: Given an object handle, looks up the attributes
203 * (metadata) for that handle.
204 *
205 * Initializes key and value structures to direct metadata:
206 * - if this is the starting (base) handle, store in
207 *   s_op->u.lookup.base_attr
208 * - otherwise store it in the appropriate slot in the resp handle array
209 *
210 * Posts the keyval read to trove.
211 */
212static PINT_sm_action lookup_read_object_metadata(
213        struct PINT_smcb *smcb, job_status_s *js_p)
214{
215    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
216    int ret = -PVFS_EINVAL;
217    job_id_t j_id;
218    PVFS_handle handle = PVFS_HANDLE_NULL;
219    PVFS_ds_attributes *ds_attr = NULL;
220
221    assert(s_op->u.lookup.seg_nr <= s_op->u.lookup.seg_ct);
222
223    /* use the base handle if we haven't looked up a segment yet */
224    if (s_op->u.lookup.seg_nr == 0)
225    {
226        handle = s_op->req->u.lookup_path.handle;
227        ds_attr = &(s_op->u.lookup.ds_attr_array[0]);
228    }
229    else
230    {
231        handle = s_op->resp.u.lookup_path.handle_array[
232            s_op->u.lookup.seg_nr-1];
233        ds_attr = &(s_op->u.lookup.ds_attr_array[
234                        s_op->u.lookup.seg_nr - 1]);
235    }
236
237    /* update our successful handle read count */
238    s_op->u.lookup.handle_ct++;
239
240    /* Copy the fsid and handle to the s_op structure for the acl check */
241    s_op->target_handle = handle;
242    s_op->target_fs_id = s_op->req->u.lookup_path.fs_id;
243
244    /* get the dspace attributes/metadata */
245    ret = job_trove_dspace_getattr(
246        s_op->req->u.lookup_path.fs_id, handle, smcb, ds_attr,
247        0, js_p, &j_id, server_job_context, s_op->req->hints );
248
249    return ret;
250}
251
252/*
253 * Function: lookup_verify_object_metadata
254 *
255 * Synopsis: Examine the metadata returned from the prelude sm.  If
256 * the metadata is for a directory, prepare to read the handle of the
257 * next segment, if there is one.  If the metadata is for a file,
258 * prepare to send a response.
259 *
260 * If the object is a directory, this function sets the
261 * s_op->u.lookup.segp value to point to the next segment to look up;
262 * this is used in lookup_read_directory_entry.
263 *
264 * This function does not post an operation, but rather returns 1
265 * immediately.
266 */
267static PINT_sm_action lookup_verify_object_metadata(
268        struct PINT_smcb *smcb, job_status_s *js_p)
269{
270    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
271    int ret = -PVFS_EINVAL;
272    PVFS_object_attr *a_p = NULL;
273    PVFS_ds_attributes *ds_attr = NULL;
274
275    if (s_op->u.lookup.seg_nr == 0)
276    {
277        a_p = &s_op->attr;
278        ds_attr = &(s_op->u.lookup.ds_attr_array[0]);
279    }
280    else
281    {
282        a_p = &s_op->resp.u.lookup_path.attr_array[
283            s_op->u.lookup.seg_nr - 1];
284        ds_attr = &(s_op->u.lookup.ds_attr_array[
285                        s_op->u.lookup.seg_nr - 1]);
286    }
287
288    PVFS_ds_attr_to_object_attr(ds_attr, a_p);
289    a_p->mask = PVFS_ATTR_COMMON_ALL;
290    s_op->target_object_attr = a_p;
291
292    /* update our successful attr read count */
293    s_op->u.lookup.attr_ct++;
294
295    assert(((a_p->objtype == PVFS_TYPE_DIRECTORY) ||
296            (a_p->objtype == PVFS_TYPE_METAFILE)  ||
297            (a_p->objtype == PVFS_TYPE_SYMLINK)));
298
299    gossip_debug(
300        GOSSIP_SERVER_DEBUG, "  attrs = (owner = %d, group = %d, "
301        "perms = %o, type = %d)\n", a_p->owner, a_p->group,
302        a_p->perms, a_p->objtype);
303
304    /* if we hit a metafile, we are done */
305    if (a_p->objtype == PVFS_TYPE_METAFILE)
306    {
307        gossip_debug(GOSSIP_SERVER_DEBUG, "  object is a metafile; "
308                     "halting lookup and sending response\n");
309
310        js_p->error_code = STATE_ENOTDIR;
311        return SM_ACTION_COMPLETE;
312    }
313
314    /*
315      if we hit a symlink, we're done; client will pick up the pieces
316      and continue to resolve the symlink if required
317    */
318    if (a_p->objtype == PVFS_TYPE_SYMLINK)
319    {
320        gossip_debug(GOSSIP_SERVER_DEBUG, "  object is a symlink; "
321                     "halting lookup and sending response\n");
322
323        js_p->error_code = STATE_ENOTDIR;
324        return SM_ACTION_COMPLETE;
325    }
326
327    /* if we looked up all the segments, we are done */
328    if (s_op->u.lookup.seg_nr == s_op->u.lookup.seg_ct)
329    {
330        gossip_debug(GOSSIP_SERVER_DEBUG, "  no more segments in path; "
331                     "sending response\n");
332
333        js_p->error_code = STATE_NOMORESEGS;
334        return SM_ACTION_COMPLETE;
335    }
336
337    /* if we reach this point, the object is a directory.  Verify that we
338     * have execute permission on the directory before continuing traversal
339     */
340    js_p->error_code = PINT_check_mode(a_p, s_op->req->credentials.uid,
341        s_op->req->credentials.gid, PINT_ACCESS_EXECUTABLE);
342    if(js_p->error_code != 0)
343    {
344        /* doesn't look like we have permission to traverse directory; bail
345         * out
346         */
347        js_p->error_code = LOOKUP_CHECK_DIR_ACLS;
348        return SM_ACTION_COMPLETE;
349    }
350
351    /* find the segment that we should look up in the directory */
352    ret = PINT_string_next_segment(
353        s_op->req->u.lookup_path.path, &s_op->u.lookup.segp,
354        &s_op->u.lookup.segstate);
355
356    if(ret != 0)
357    {
358        gossip_err("PINT_string_next_segment failed: path: %s\n",
359                   s_op->req->u.lookup_path.path);
360    }
361    assert(ret == 0);
362
363    gossip_debug(GOSSIP_SERVER_DEBUG, "  object is a directory; will be "
364                 "looking for handle for segment \"%s\" in a bit\n",
365                 s_op->u.lookup.segp);
366
367
368    return SM_ACTION_COMPLETE;
369}
370
371/*
372 * Post a keyval DB read of the posix acls to check and see if
373 * directory traversal is allowed or not
374 */
375static PINT_sm_action lookup_check_acls_if_needed(
376    struct PINT_smcb *smcb, job_status_s* js_p)
377{
378    int ret = -PVFS_EINVAL;
379    job_id_t i;
380    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
381
382    /* If we get here with an invalid fsid and handle, we have to
383     * return -PVFS_EACCESS
384     */
385    if (s_op->target_fs_id == PVFS_FS_ID_NULL
386        || s_op->target_handle == PVFS_HANDLE_NULL)
387    {
388        js_p->error_code = -PVFS_EACCES;
389        return SM_ACTION_COMPLETE;
390    }
391    js_p->error_code = 0;
392
393    memset(&s_op->key, 0, sizeof(PVFS_ds_keyval));
394    memset(&s_op->val, 0, sizeof(PVFS_ds_keyval));
395    s_op->key.buffer = "system.posix_acl_access";
396    s_op->key.buffer_sz = strlen(s_op->key.buffer) + 1;
397    s_op->val.buffer = (char *) malloc(PVFS_REQ_LIMIT_VAL_LEN);
398    if (!s_op->val.buffer)
399    {
400        js_p->error_code = -PVFS_ENOMEM;
401        return SM_ACTION_COMPLETE;
402    }
403    s_op->val.buffer_sz = PVFS_REQ_LIMIT_VAL_LEN;
404
405    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "About to retrieve acl keyvals "
406                 "for handle %llu\n", llu(s_op->target_handle));
407
408    /* Read acl keys */
409    ret = job_trove_keyval_read(
410        s_op->target_fs_id,
411        s_op->target_handle,
412        &s_op->key,
413        &s_op->val,
414        0,
415        NULL,
416        s_op,
417        0,
418        js_p,
419        &i,
420        server_job_context, s_op->req->hints);
421    return ret;
422}
423
424/*
425 * Verify if the completed keyval DB operation allows the lookup
426 * to proceed or not. i.e. executable privileges on directory
427 * for the requesting user or not.
428 */
429static PINT_sm_action lookup_check_acls(
430    struct PINT_smcb *smcb, job_status_s* js_p)
431{
432    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
433    PVFS_object_attr *obj_attr = NULL;
434    int want = PVFS2_ACL_EXECUTE;
435
436    /* The dspace attr must have been read at this point */
437    obj_attr = s_op->target_object_attr;
438    assert(obj_attr);
439
440    /* anything non-zero we treat as a real error */
441    if (js_p->error_code)
442    {
443        goto cleanup;
444    }
445    /* ok; let the actual acl check be done */
446    js_p->error_code = PINT_check_acls(s_op->val.buffer,
447                        s_op->val.read_sz,
448                        obj_attr,
449                        s_op->req->credentials.uid,
450                        s_op->req->credentials.gid,
451                        want);
452    /* if we are good to go,
453       find the segment that we should look up in the directory */
454    if (js_p->error_code == 0)
455    {
456        js_p->error_code = PINT_string_next_segment(
457            s_op->req->u.lookup_path.path, &s_op->u.lookup.segp,
458            &s_op->u.lookup.segstate);
459
460        if(js_p->error_code != 0)
461        {
462            gossip_err("PINT_string_next_segment failed to get the"
463                       "next segment to lookup from the path: %s\n",
464                       s_op->req->u.lookup_path.path);
465        }
466        else
467        {
468            gossip_debug(GOSSIP_SERVER_DEBUG, "  after ACL check "
469                         "object is a directory; will be "
470                         "looking for handle for segment \"%s\" in a bit\n",
471                         s_op->u.lookup.segp);
472        }
473    }
474cleanup:
475    if (s_op->val.buffer)
476        free(s_op->val.buffer);
477    memset(&s_op->key, 0, sizeof(PVFS_ds_keyval));
478    memset(&s_op->val, 0, sizeof(PVFS_ds_keyval));
479    return SM_ACTION_COMPLETE;
480}
481
482/*
483 * Function: lookup_read_num_directory_entry_handles
484 *
485 * Synopsis: Given a directory handle, determine how many handles are
486 * used to store directory entries for this directory.
487 *
488 * Initializes key and value structures to direct num handles into
489 * s_op->u.lookup.num_dirent_handles. The handle to use for the
490 * read is either:
491 * - the starting handle from the req (if we haven't looked up a
492 *   segment yet), or
493 * - the previous segment's handle (from response handle array).
494 *
495 * Posts the keyval read to trove.
496 */
497static PINT_sm_action lookup_read_num_directory_entry_handles(
498        struct PINT_smcb *smcb, job_status_s *js_p)
499{
500    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
501    int ret = -PVFS_EINVAL;
502    PVFS_handle handle = PVFS_HANDLE_NULL;
503    job_id_t j_id;
504
505    /* use the base handle if we haven't looked up a segment yet */
506    if (s_op->u.lookup.seg_nr == 0)
507    {
508        handle = s_op->req->u.lookup_path.handle;
509    }
510    else
511    {
512        handle = s_op->resp.u.lookup_path.handle_array[
513            s_op->u.lookup.seg_nr-1];
514    }
515
516    gossip_debug(GOSSIP_SERVER_DEBUG,
517                 "  reading distributed directory attributes from handle %llu\n",
518                 llu(handle));
519
520    s_op->key.buffer = Trove_Common_Keys[DIST_DIR_ATTR_KEY].key;
521    s_op->key.buffer_sz = Trove_Common_Keys[DIST_DIR_ATTR_KEY].size;
522    s_op->val.buffer = &s_op->u.lookup.attr.u.dir.dist_dir_attr;
523    s_op->val.buffer_sz = sizeof(s_op->u.lookup.attr.u.dir.dist_dir_attr);
524
525    ret = job_trove_keyval_read(
526        s_op->req->u.lookup_path.fs_id, handle, &s_op->key, &s_op->val,
527        0,
528        NULL, smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
529
530    return ret;
531}
532
533/*
534 * Function: lookup_read_directory_entry_handles
535 *
536 * Synopsis: Given a directory handle, look up the handle used to
537 * store directory entries for this directory.
538 *
539 * Initializes key and value structures to direct handle into
540 * s_op->u.lookup.dirent_handle, which is where we always store the
541 * handle used to read directory entries.  The handle to use for the
542 * read is either:
543 * - the starting handle from the req (if we haven't looked up a
544 *   segment yet), or
545 * - the previous segment's handle (from response handle array).
546 *
547 * Posts the keyval read to trove.
548 */
549static PINT_sm_action lookup_read_directory_entry_handles(
550        struct PINT_smcb *smcb, job_status_s *js_p)
551{
552    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
553    int ret = -PVFS_EINVAL;
554    PVFS_handle handle = PVFS_HANDLE_NULL;
555    job_id_t j_id;
556    PVFS_object_attr *attr;
557    int keyval_count;
558
559    attr = &s_op->u.lookup.attr;
560
561    if(js_p->error_code == -TROVE_ENOENT)
562    {
563        gossip_debug(GOSSIP_GETATTR_DEBUG, "lookup: no DIST_DIR_ATTR key present!\n");
564        attr->u.dir.dist_dir_bitmap = NULL;
565        attr->u.dir.dirdata_handles = NULL;
566        return SM_ACTION_COMPLETE;
567    }
568
569    assert(attr->u.dir.dist_dir_attr.num_servers > 0 &&
570        attr->u.dir.dist_dir_attr.bitmap_size > 0);
571
572    /* use the base handle if we haven't looked up a segment yet */
573    if (s_op->u.lookup.seg_nr == 0)
574    {
575        handle = s_op->req->u.lookup_path.handle;
576    }
577    else
578    {
579        handle = s_op->resp.u.lookup_path.handle_array[
580            s_op->u.lookup.seg_nr-1];
581    }
582
583    gossip_debug(GOSSIP_SERVER_DEBUG,
584            "lookup: get dist-dir-attr for dir meta handle %llu "
585            "with tree_height=%d, num_servers=%d, bitmap_size=%d, "
586            "split_size=%d, server_no=%d and branch_level=%d\n",
587            llu(handle),
588            attr->u.dir.dist_dir_attr.tree_height,
589            attr->u.dir.dist_dir_attr.num_servers,
590            attr->u.dir.dist_dir_attr.bitmap_size,
591            attr->u.dir.dist_dir_attr.split_size,
592            attr->u.dir.dist_dir_attr.server_no,
593            attr->u.dir.dist_dir_attr.branch_level);
594
595    /* allocate space for bitmap and dirdata handles */
596    attr->u.dir.dist_dir_bitmap =
597        malloc(attr->u.dir.dist_dir_attr.bitmap_size *
598                sizeof(PVFS_dist_dir_bitmap_basetype));
599    attr->u.dir.dirdata_handles =
600        malloc(attr->u.dir.dist_dir_attr.num_servers *
601                sizeof(PVFS_handle));
602    if(!attr->u.dir.dist_dir_bitmap ||
603            !attr->u.dir.dirdata_handles)
604    {
605        js_p->error_code = -PVFS_ENOMEM;
606        return SM_ACTION_COMPLETE;
607    }
608
609
610    /* total 2 keyvals, DIST_DIRDATA_BITMAP, DIST_DIRDATA_HANDLES */
611    keyval_count = 2;
612
613    s_op->key_a = malloc(sizeof(PVFS_ds_keyval) * keyval_count);
614    if(!s_op->key_a)
615    {
616        js_p->error_code = -PVFS_ENOMEM;
617        return SM_ACTION_COMPLETE;
618    }
619
620    s_op->val_a = malloc(sizeof(PVFS_ds_keyval) * keyval_count);
621    if(!s_op->val_a)
622    {
623        free(s_op->key_a);
624        js_p->error_code = -PVFS_ENOMEM;
625        return SM_ACTION_COMPLETE;
626    }
627    memset(s_op->val_a, 0, sizeof(PVFS_ds_keyval) * keyval_count);
628
629    s_op->key_a[0].buffer = Trove_Common_Keys[DIST_DIRDATA_BITMAP_KEY].key;
630    s_op->key_a[0].buffer_sz = Trove_Common_Keys[DIST_DIRDATA_BITMAP_KEY].size;
631
632    s_op->val_a[0].buffer_sz =
633        attr->u.dir.dist_dir_attr.bitmap_size *
634        sizeof(PVFS_dist_dir_bitmap_basetype);
635    s_op->val_a[0].buffer = attr->u.dir.dist_dir_bitmap;
636
637    s_op->key_a[1].buffer = Trove_Common_Keys[DIST_DIRDATA_HANDLES_KEY].key;
638    s_op->key_a[1].buffer_sz = Trove_Common_Keys[DIST_DIRDATA_HANDLES_KEY].size;
639
640    s_op->val_a[1].buffer = attr->u.dir.dirdata_handles;
641    s_op->val_a[1].buffer_sz = attr->u.dir.dist_dir_attr.num_servers *
642        sizeof(PVFS_handle);
643
644    s_op->u.lookup.err_array = (PVFS_error*)calloc(keyval_count,
645        sizeof(PVFS_error));
646    if (s_op->u.lookup.err_array == NULL)
647    {
648        js_p->error_code = -PVFS_ENOMEM;
649        return SM_ACTION_COMPLETE;
650    }
651
652    js_p->error_code = 0;
653    ret = job_trove_keyval_read_list(
654        s_op->req->u.lookup_path.fs_id, handle,
655        s_op->key_a, s_op->val_a,
656        s_op->u.lookup.err_array,
657        keyval_count,
658        0,
659        NULL,
660        smcb,
661        0,
662        js_p,
663        &j_id,
664        server_job_context, s_op->req->hints);
665
666    return ret;
667}
668
669/*
670 * Function: lookup_read_directory_entry
671 *
672 * Synopsis: Given a handle for a dspace holding directory entries,
673 * look up the current segment and obtain its handle.
674 */
675static PINT_sm_action lookup_read_directory_entry(
676        struct PINT_smcb *smcb, job_status_s *js_p)
677{
678    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
679    int ret = -PVFS_EINVAL;
680    job_id_t j_id;
681
682    /* TODO: Need to see if directory entry is on this server. If not we
683       can't continue processing the current segment. */
684    gossip_debug(
685        GOSSIP_SERVER_DEBUG, "  need to read from dirent handle = "
686        "%llu, segment = %s (len=%d)\n", llu(s_op->u.lookup.attr.u.dir.dirdata_handles[0]),
687        s_op->u.lookup.segp, (int) strlen(s_op->u.lookup.segp));
688
689    s_op->key.buffer = s_op->u.lookup.segp;
690    s_op->key.buffer_sz = strlen(s_op->u.lookup.segp) + 1;
691    s_op->val.buffer =
692        &s_op->resp.u.lookup_path.handle_array[s_op->u.lookup.seg_nr];
693    s_op->val.buffer_sz = sizeof(PVFS_handle);
694
695    /*
696      NOTE: if this operation fails, seg_nr will indicate one too many
697      valid segments; this is addressed in lookup_send_response.
698    */
699    s_op->u.lookup.seg_nr++;
700
701    /* TODO: Decide which dirdata_handle is correct. Do server-to-server
702       communication if necessary to read the entry. */
703    ret = job_trove_keyval_read(
704        s_op->req->u.lookup_path.fs_id, s_op->u.lookup.attr.u.dir.dirdata_handles[0],
705        &s_op->key, &s_op->val,
706        0,
707        NULL, smcb, 0, js_p, &j_id,
708        server_job_context, s_op->req->hints);
709
710    return ret;
711}
712
713
714static PINT_sm_action lookup_setup_resp(
715        struct PINT_smcb *smcb, job_status_s *js_p)
716{
717    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
718
719    /*
720      NOTE: we may have handle_count N with attr_count N-1 in the case
721      that another meta-server needs to continue with the path (attr)
722      lookup.  otherwise, we're returning N handles with N attrs.
723
724      # actually completed are one less than the respective counts
725    */
726    s_op->resp.u.lookup_path.handle_count = s_op->u.lookup.handle_ct - 1;
727    s_op->resp.u.lookup_path.attr_count = s_op->u.lookup.attr_ct - 1;
728
729    if (s_op->resp.u.lookup_path.handle_count ||
730        s_op->resp.u.lookup_path.attr_count)
731    {
732        js_p->error_code = 0;
733    }
734    else if(js_p->error_code < 0)
735    {
736        /* preserve error code in this case and fall through */
737        gossip_debug(GOSSIP_SERVER_DEBUG, "  lookup error in previous step\n");
738    }
739    else
740    {
741        js_p->error_code = -PVFS_ENOENT;
742    }
743
744    gossip_debug(GOSSIP_SERVER_DEBUG, "  sending '%s' response with %d "
745                 "handle(s) and %d attr(s)\n",
746                 (js_p->error_code ? "error" : "success"),
747                 s_op->resp.u.lookup_path.handle_count,
748                 s_op->resp.u.lookup_path.attr_count);
749
750    if(js_p->error_code == 0)
751    {
752        PINT_ACCESS_DEBUG(
753            s_op, GOSSIP_ACCESS_DEBUG, "path: %s, handle: %llu\n",
754            s_op->req->u.lookup_path.path,
755            llu(s_op->resp.u.lookup_path.handle_array[
756                s_op->resp.u.lookup_path.handle_count-1]));
757    }
758    else
759    {
760        PINT_ACCESS_DEBUG(
761            s_op, GOSSIP_ACCESS_DEBUG, "path: %s, lookup failed\n",
762            s_op->req->u.lookup_path.path);
763    }
764
765    return SM_ACTION_COMPLETE;
766}
767
768/*
769 * Function: lookup_cleanup
770 *
771 * Synopsis: Free memory allocated during request processing.
772 *
773 * There are a bunch of regions that must be freed after processing
774 * completes:
775 * - decoded request (s_op->decoded)
776 * - encoded request (s_op->unexp_bmi_buff.buffer)
777 * - encoded response (s_op->encoded)
778 * - original (decoded) response (s_op->resp)
779 * - dynamically allocated space (in this case
780 *   s_op->resp.u.lookup_path.handle_array)
781 * - the server operation structure itself
782 */
783static PINT_sm_action lookup_cleanup(
784        struct PINT_smcb *smcb, job_status_s *js_p)
785{
786    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
787    if (s_op->resp.u.lookup_path.handle_array)
788    {
789        free(s_op->resp.u.lookup_path.handle_array);
790        s_op->resp.u.lookup_path.handle_array = NULL;
791    }
792
793    if (s_op->u.lookup.ds_attr_array)
794    {
795        free(s_op->u.lookup.ds_attr_array);
796        s_op->u.lookup.ds_attr_array = NULL;
797    }
798
799    if (s_op->u.lookup.err_array)
800    {
801        free(s_op->u.lookup.err_array);
802        s_op->u.lookup.err_array = NULL;
803    }
804
805    /* TODO: Need to free bitmap and handles arrays. */
806
807    return(server_state_machine_complete(smcb));
808}
809
810PINT_GET_OBJECT_REF_DEFINE(lookup_path);
811
812struct PINT_server_req_params pvfs2_lookup_params =
813{
814    .string_name = "lookup_path",
815    .perm = PINT_SERVER_CHECK_NONE,
816    .sched_policy = PINT_SERVER_REQ_SCHEDULE,
817    .get_object_ref = PINT_get_object_ref_lookup_path,
818    .state_machine = &pvfs2_lookup_sm
819};
820
821/*
822 * Local variables:
823 *  mode: c
824 *  c-indent-level: 4
825 *  c-basic-offset: 4
826 * End:
827 *
828 * vim: ft=c ts=8 sts=4 sw=4 expandtab
829 */
Note: See TracBrowser for help on using the browser.