root/branches/Orange-Elaine-Distr-Dir-Branch/src/client/sysint/sys-readdir.sm @ 8418

Revision 8418, 10.6 KB (checked in by elaine, 3 years ago)

Pass multiple dirent handles between server and client.

Line 
1/*
2 * (C) 2003 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/** \file
8 *  \ingroup sysint
9 *
10 *  PVFS2 system interface routines for reading entries from a directory.
11 */
12#include <string.h>
13#include <assert.h>
14
15#include "client-state-machine.h"
16#include "pvfs2-debug.h"
17#include "job.h"
18#include "gossip.h"
19#include "str-utils.h"
20#include "pint-cached-config.h"
21#include "PINT-reqproto-encode.h"
22#include "ncache.h"
23#include "pint-util.h"
24#include "pvfs2-internal.h"
25
26extern job_context_id pint_client_sm_context;
27
28static int readdir_msg_comp_fn(
29    void *v_p, struct PVFS_server_resp *resp_p, int index);
30
31%%
32
33nested machine pvfs2_client_readdir_sm
34{
35    state init
36    {
37        run readdir_init;
38        default => readdir_getattr;
39    }
40
41    state readdir_getattr
42    {
43        jump pvfs2_client_getattr_sm;
44        success => readdir_msg_setup_msgpair;
45        default => cleanup;
46    }
47
48    state readdir_msg_setup_msgpair
49    {
50        run readdir_msg_setup_msgpair;
51        success => readdir_msg_xfer_msgpair;
52        default => readdir_msg_failure;
53    }
54
55    state readdir_msg_xfer_msgpair
56    {
57        jump pvfs2_msgpairarray_sm;
58        success => cleanup;
59        default => readdir_msg_failure;
60    }
61
62    state readdir_msg_failure
63    {
64        run readdir_msg_failure;
65        default => cleanup;
66    }
67
68    state cleanup
69    {
70        run readdir_cleanup;
71        default => return;
72    }
73}
74
75machine pvfs2_client_sysint_readdir_sm
76{
77    state dowork
78    {
79        jump pvfs2_client_readdir_sm;
80        default => do_cleanup;
81    }
82    state do_cleanup
83    {
84        run do_cleanup;
85        default => terminate;
86    }
87}
88
89%%
90
91/** Initiate reading of entries from a directory.
92 *
93 *  \param token opaque value used to track position in directory
94 *         when more than one read is required.
95 *  \param pvfs_dirent_incount maximum number of entries to read, if
96 *         available, starting from token.
97 */
98PVFS_error PVFS_isys_readdir(
99    PVFS_object_ref ref,
100    PVFS_ds_position token,
101    int32_t pvfs_dirent_incount,
102    const PVFS_credentials *credentials,
103    PVFS_sysresp_readdir *resp,
104    PVFS_sys_op_id *op_id,
105    PVFS_hint hints,
106    void *user_ptr)
107{
108    PVFS_error ret = -PVFS_EINVAL;
109    PINT_smcb *smcb = NULL;
110    PINT_client_sm *sm_p = NULL;
111
112    gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_readdir entered\n");
113
114    if ((ref.handle == PVFS_HANDLE_NULL) ||
115        (ref.fs_id == PVFS_FS_ID_NULL) ||
116        (resp == NULL))
117    {
118        gossip_err("invalid (NULL) required argument\n");
119        return ret;
120    }
121
122    if (pvfs_dirent_incount > PVFS_REQ_LIMIT_DIRENT_COUNT)
123    {
124        gossip_lerr("PVFS_isys_readdir unable to handle request "
125                    "for %d entries.\n", pvfs_dirent_incount);
126        return ret;
127    }
128
129    PINT_smcb_alloc(&smcb, PVFS_SYS_READDIR,
130             sizeof(struct PINT_client_sm),
131             client_op_state_get_machine,
132             client_state_machine_terminate,
133             pint_client_sm_context);
134    if (smcb == NULL)
135    {
136        return -PVFS_ENOMEM;
137    }
138    sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
139
140    PINT_init_msgarray_params(sm_p, ref.fs_id);
141    PINT_init_sysint_credentials(sm_p->cred_p, credentials);
142    sm_p->u.readdir.readdir_resp = resp;
143    sm_p->object_ref = ref;
144    PVFS_hint_copy(hints, &sm_p->hints);
145    PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &ref.handle);
146
147    /* point the sm dirent array and outcount to the readdir response field */
148    sm_p->readdir.dirent_array = &resp->dirent_array;
149    sm_p->readdir.dirent_outcount = &resp->pvfs_dirent_outcount;
150    sm_p->readdir.token = &resp->token;
151    sm_p->readdir.directory_version = &resp->directory_version;
152
153    sm_p->readdir.pos_token = sm_p->u.readdir.pos_token = token;
154    sm_p->readdir.dirent_limit = sm_p->u.readdir.dirent_limit = pvfs_dirent_incount;
155
156    gossip_debug(GOSSIP_READDIR_DEBUG, "Doing readdir on handle "
157                 "%llu on fs %d\n", llu(ref.handle), ref.fs_id);
158
159    return PINT_client_state_machine_post(
160        smcb,  op_id, user_ptr);
161}
162
163/** Read entries from a directory.
164 *
165 *  \param token opaque value used to track position in directory
166 *         when more than one read is required.
167 *  \param pvfs_dirent_incount maximum number of entries to read, if
168 *         available, starting from token.
169 */
170PVFS_error PVFS_sys_readdir(
171    PVFS_object_ref ref,
172    PVFS_ds_position token,
173    int32_t pvfs_dirent_incount,
174    const PVFS_credentials *credentials,
175    PVFS_sysresp_readdir *resp,
176    PVFS_hint hints)
177{
178    PVFS_error ret = -PVFS_EINVAL, error = 0;
179    PVFS_sys_op_id op_id;
180
181    gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_readdir entered\n");
182
183    ret = PVFS_isys_readdir(ref, token, pvfs_dirent_incount,
184                            credentials, resp, &op_id, hints, NULL);
185    if (ret)
186    {
187        PVFS_perror_gossip("PVFS_isys_readdir call", ret);
188        error = ret;
189    }
190    else
191    {
192        ret = PVFS_sys_wait(op_id, "readdir", &error);
193        if (ret)
194        {
195            PVFS_perror_gossip("PVFS_sys_wait call", ret);
196            error = ret;
197        }
198    }
199
200    PINT_sys_release(op_id);
201    return error;
202}
203
204/****************************************************************/
205
206static PINT_sm_action readdir_init(
207        struct PINT_smcb *smcb, job_status_s *js_p)
208{
209    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
210    gossip_debug(GOSSIP_CLIENT_DEBUG, "readdir state: init\n");
211
212    PINT_SM_GETATTR_STATE_FILL(
213        sm_p->getattr,
214        sm_p->object_ref,
215        (PVFS_ATTR_DIR_ALL|PVFS_ATTR_DIR_DIRENT_FILES),
216        PVFS_TYPE_DIRECTORY,
217        0);
218   
219    assert(js_p->error_code == 0);
220
221    return SM_ACTION_COMPLETE;
222}
223
224static PINT_sm_action readdir_msg_setup_msgpair(
225        struct PINT_smcb *smcb, job_status_s *js_p)
226{
227    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
228    int ret = -PVFS_EINVAL;
229    PINT_sm_msgpair_state *msg_p = NULL;
230
231    gossip_debug(GOSSIP_CLIENT_DEBUG, "readdir state: "
232                 "readdir_msg_setup_msgpair\n");
233
234    if (js_p->error_code)
235    {
236        return SM_ACTION_COMPLETE;
237    }
238    js_p->error_code = 0;
239
240    gossip_debug(GOSSIP_READDIR_DEBUG," readdir: posting readdir req\n");
241
242    /* TODO: Need to find the correct dirent_handle */
243    gossip_debug(
244        GOSSIP_READDIR_DEBUG, "%llu|%llu|%d | token is %llu | limit is %d\n",
245        llu(sm_p->object_ref.handle),
246        llu(sm_p->getattr.attr.u.dir.dirent_handle[0]),
247        sm_p->object_ref.fs_id,
248        llu(sm_p->readdir.pos_token),
249        sm_p->readdir.dirent_limit);
250
251    PINT_msgpair_init(&sm_p->msgarray_op);
252    msg_p = &sm_p->msgarray_op.msgpair;
253
254    PINT_SERVREQ_READDIR_FILL(
255        msg_p->req,
256        *sm_p->cred_p,
257        sm_p->object_ref.fs_id,
258        sm_p->object_ref.handle,
259        sm_p->getattr.attr.u.dir.dirent_handle[0],
260        sm_p->u.readdir.pos_token,
261        sm_p->u.readdir.dirent_limit,
262        sm_p->hints);
263
264    msg_p->fs_id = sm_p->object_ref.fs_id;
265    msg_p->handle = sm_p->object_ref.handle;
266    msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
267    msg_p->comp_fn = readdir_msg_comp_fn;
268
269    ret = PINT_cached_config_map_to_server(
270        &msg_p->svr_addr, sm_p->object_ref.handle,
271        sm_p->object_ref.fs_id);
272
273    if (ret)
274    {
275        gossip_err("Failed to map meta server address\n");
276        js_p->error_code = ret;
277    }
278
279    PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
280    return SM_ACTION_COMPLETE;
281}
282
283static int readdir_msg_comp_fn(void *v_p,
284                               struct PVFS_server_resp *resp_p,
285                               int index)
286{
287    PINT_smcb *smcb = v_p;
288    PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
289   
290    gossip_debug(GOSSIP_CLIENT_DEBUG, "readdir_msg_comp_fn\n");
291
292    assert(resp_p->op == PVFS_SERV_READDIR);
293
294    if (resp_p->status != 0)
295    {
296        return resp_p->status;
297    }
298
299    /* convert servresp_readdir response to a sysresp_readdir obj */
300
301    *(sm_p->readdir.token) = resp_p->u.readdir.token;
302    *(sm_p->readdir.directory_version) =
303        resp_p->u.readdir.directory_version;
304    *(sm_p->readdir.dirent_outcount) =
305        resp_p->u.readdir.dirent_count;
306    if (*(sm_p->readdir.dirent_outcount) > 0)
307    {
308        int dirent_array_len =
309            (sizeof(PVFS_dirent) * *(sm_p->readdir.dirent_outcount));
310
311        /* this dirent_array MUST be freed by caller */
312        *(sm_p->readdir.dirent_array) =
313            (PVFS_dirent *) malloc(dirent_array_len);
314        assert(*(sm_p->readdir.dirent_array));
315
316        memcpy(*(sm_p->readdir.dirent_array),
317               resp_p->u.readdir.dirent_array, dirent_array_len);
318    }
319
320    gossip_debug(GOSSIP_READDIR_DEBUG, "*** Got %d directory entries "
321                 "[version %lld]\n",
322                 *(sm_p->readdir.dirent_outcount),
323                 lld(*(sm_p->readdir.directory_version)));
324
325    return 0;
326}
327
328static PINT_sm_action readdir_msg_failure(
329        struct PINT_smcb *smcb, job_status_s *js_p)
330{
331    gossip_debug(GOSSIP_CLIENT_DEBUG,
332                 "readdir state: readdir_msg_failure\n");
333    return SM_ACTION_COMPLETE;
334}
335
336static PINT_sm_action readdir_cleanup(
337        struct PINT_smcb *smcb, job_status_s *js_p)
338{
339    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
340    int i = 0;
341    PVFS_object_ref tmp_ref;
342    gossip_debug(GOSSIP_CLIENT_DEBUG, "readdir state: cleanup\n");
343
344    if(js_p->error_code == 0)
345    {
346        /* insert all handles into the ncache while we have them */
347        tmp_ref.fs_id = sm_p->object_ref.fs_id;
348        for(i = 0; i < *(sm_p->readdir.dirent_outcount); i++)
349        {
350            tmp_ref.handle = (*(sm_p->readdir.dirent_array))[i].handle;
351            PINT_ncache_update(
352                (const char *) (*(sm_p->readdir.dirent_array))[i].d_name,
353                (const PVFS_object_ref *) &(tmp_ref),
354                (const PVFS_object_ref *) &(sm_p->object_ref));
355        }
356    }
357
358    PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
359
360    if(js_p->error_code != 0)
361    {
362        PINT_acache_invalidate(sm_p->object_ref);
363    }
364    return SM_ACTION_COMPLETE;
365}
366
367static PINT_sm_action do_cleanup(
368        struct PINT_smcb *smcb, job_status_s *js_p)
369{
370    struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
371    gossip_debug(GOSSIP_CLIENT_DEBUG, "readdir state: do_cleanup\n");
372
373    PINT_free_object_attr(&sm_p->getattr.attr);
374    sm_p->error_code = js_p->error_code;
375    gossip_debug(GOSSIP_READDIR_DEBUG, " final return code is %d\n",
376                 sm_p->error_code);
377
378    PINT_SET_OP_COMPLETE;
379    return SM_ACTION_TERMINATE;
380}
381
382
383
384/*
385 * Local variables:
386 *  mode: c
387 *  c-indent-level: 4
388 *  c-basic-offset: 4
389 * End:
390 *
391 * vim: ft=c ts=8 sts=4 sw=4 expandtab
392 */
Note: See TracBrowser for help on using the browser.