root/branches/as-branch/src/server/allreduce.sm @ 7843

Revision 7843, 6.3 KB (checked in by sson, 4 years ago)

Fixed the incorrect buffer copy problem from the read buffer (pipeline.buffer) to objects[][].

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/*
8 *  PVFS2 server state machine for driving read I/O operations.
9 */
10
11#include <string.h>
12#include <assert.h>
13
14#include "server-config.h"
15#include "pvfs2-server.h"
16#include "pvfs2-attr.h"
17#include "pvfs2-request.h"
18#include "pint-distribution.h"
19#include "pint-request.h"
20#include "pvfs2-internal.h"
21#include "pint-cached-config.h"
22#include "trove.h"
23
24#define CONTINUE_ALLREDUCE 201
25#define ALLREDUCE_TAG 3333
26
27%%
28
29nested machine pvfs2_send_recv_sm
30{
31    state send_recv
32    {
33        run send_recv_fn;
34        success => epilog;
35    }
36
37    state epilog
38    {
39        run dummy_epilog;
40        default => return;
41    }
42}
43
44nested machine pvfs2_allreduce_sm
45{
46    state allreduce
47    {
48        pjmp allreduce_fn
49        {
50            success => pvfs2_send_recv_sm;
51        }
52        success => check_allreduce;
53    }
54
55    state check_allreduce
56    {
57        run check_allreduce_done;
58        CONTINUE_ALLREDUCE => allreduce;
59        default => return;
60    }
61}
62
63%%
64
65/*
66 */
67static PINT_sm_action send_recv_fn(struct PINT_smcb *smcb, job_status_s *js_p)
68{
69    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
70    struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);
71    struct server_configuration_s *user_opts = get_server_config_struct();
72    job_id_t tmp_id;
73    int ret;
74    int myRank = s_op->u.send_recv.myRank, newRank;
75    PVFS_handle new_rank_handle;
76    PVFS_BMI_addr_t svr_addr;
77    PVFS_msg_tag_t session_tag = ALLREDUCE_TAG;
78
79    js_p->error_code = 0;
80
81    newRank = myRank^(s_op->u.send_recv.mask);
82    gossip_debug(GOSSIP_IO_DEBUG, "%s: mask=0x%x, myRank=%d, newRank=%d\n",
83                 (s_op->u.send_recv.type==0?"SEND":"RECV"),
84                 s_op->u.send_recv.mask, myRank, newRank);
85    new_rank_handle = parent_s_op->u.allreduce.dfile_array[newRank];
86    gossip_debug(GOSSIP_IO_DEBUG, "new_rank_handle=%llu\n", llu(new_rank_handle));
87    ret = PINT_cached_config_map_to_server(&svr_addr,
88                                           new_rank_handle,
89                                           parent_s_op->u.allreduce.fs_id);
90
91    if(ret < 0) {
92        gossip_err("Failed to map meta server address\n");
93        js_p->error_code = ret;
94        return SM_ACTION_COMPLETE;
95    }
96
97    /* RECV */
98    if(s_op->u.send_recv.type == 1) {
99        ret = job_bmi_recv(svr_addr,
100                           (void*)parent_s_op->u.allreduce.recv_buf,
101                           parent_s_op->u.allreduce.buf_sz,
102                           session_tag,
103                           BMI_PRE_ALLOC,
104                           smcb,
105                           0,
106                           js_p,
107                           &tmp_id,
108                           server_job_context,
109                           user_opts->server_job_bmi_timeout,
110                           (bmi_hint)parent_s_op->u.allreduce.hints);
111       
112        if(ret == 1) {
113            js_p->error_code = ret;
114            return SM_ACTION_COMPLETE;
115        }
116       
117        if(ret == 0) {
118            js_p->error_code = ret;
119            return SM_ACTION_DEFERRED;
120        }
121    }
122    /* SEND */
123    else if(s_op->u.send_recv.type == 0) {
124        ret = job_bmi_send(svr_addr,
125                           (void*)parent_s_op->u.allreduce.send_buf,
126                           parent_s_op->u.allreduce.buf_sz,
127                           session_tag,
128                           BMI_PRE_ALLOC,
129                           0, /* 1: send_unexpected */
130                           smcb,
131                           0,
132                           js_p,
133                           &tmp_id,
134                           server_job_context,
135                           user_opts->server_job_bmi_timeout,
136                           (bmi_hint)parent_s_op->u.allreduce.hints);
137       
138        if(ret == 1) {
139            js_p->error_code = ret;
140            return SM_ACTION_COMPLETE;
141        }
142       
143        if(ret == 0) {
144            js_p->error_code = ret;
145            return SM_ACTION_DEFERRED;
146        }
147    }
148
149    js_p->error_code = 0;
150    return SM_ACTION_COMPLETE;
151}
152
153/*
154 */
155static PINT_sm_action dummy_epilog(struct PINT_smcb *smcb, job_status_s *js_p)
156{
157    js_p->error_code = 0;
158    return SM_ACTION_COMPLETE;
159}
160
161static PINT_sm_action allreduce_fn(struct PINT_smcb *smcb, job_status_s *js_p)
162{
163    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
164    struct PINT_server_op *send_recv_op;
165    int i, ret;
166
167    for(i=0; i<2; i++) {
168        send_recv_op = malloc(sizeof(*send_recv_op));
169        memset(send_recv_op, 0, sizeof(*send_recv_op));
170   
171        if(i==0)
172            send_recv_op->u.send_recv.type = 0; /* SEND */
173        else if(i==1)
174            send_recv_op->u.send_recv.type = 1; /* RECV */
175
176        send_recv_op->u.send_recv.myRank = s_op->u.allreduce.myRank;
177        send_recv_op->u.send_recv.mask = 0x1;
178
179        ret = PINT_sm_push_frame(smcb, 0, send_recv_op);
180    }
181
182    js_p->error_code = 0;
183    return SM_ACTION_COMPLETE;
184}
185
186static PINT_sm_action check_allreduce_done(struct PINT_smcb *smcb, job_status_s *js_p)
187{
188    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
189    struct PINT_server_op *send_recv_op;
190    int task_id, remaining, i, ii;
191    PVFS_error tmp_err;
192
193    s_op->u.allreduce.current_depth++;
194    s_op->u.allreduce.mask <<= 1;
195
196    for(ii=0; ii<2; ii++) {
197        send_recv_op = PINT_sm_pop_frame(smcb, &task_id, &tmp_err,
198                                         &remaining);
199
200        gossip_debug(GOSSIP_IO_DEBUG, "%s: i=%d, type=%d, remaining=%d\n",
201                     __func__, ii, send_recv_op->u.send_recv.type, remaining);
202
203        if(send_recv_op->u.send_recv.type == 1) { /* if RECV */
204            switch(s_op->u.allreduce.datatype) {
205            case ((int)0x4c000405): /* MPI_INT */
206                {
207                    int *a = s_op->u.allreduce.send_buf;
208                    int *b = s_op->u.allreduce.recv_buf;
209                   
210                    gossip_debug(GOSSIP_IO_DEBUG, "%s: SUM on INT\n", __func__);
211                    for(i=0; i<(s_op->u.allreduce.buf_sz)/sizeof(int); i++)
212                        a[i] = a[i] + b[i];
213                    memcpy(s_op->u.allreduce.send_buf, (void*)a, s_op->u.allreduce.buf_sz);
214                }
215                break;
216            case ((int)0x4c00040a): /* MPI_FLOAT */
217                {
218                    float *a = (float *)s_op->u.allreduce.send_buf;
219                    float *b = (float *)s_op->u.allreduce.recv_buf;
220                    int count = (s_op->u.allreduce.buf_sz)/sizeof(float);
221                   
222                    gossip_debug(GOSSIP_IO_DEBUG,
223                                 "%s: SUM on FLOAT, count=%d, recv_buf[0]=%f, send_buf[0]=%f\n",
224                                 __func__, count, b[0], a[0]);
225                    for(i=0; i<count; i++) {
226                        a[i] = a[i] + b[i];
227                    }
228                    memcpy(s_op->u.allreduce.send_buf, (void*)a,
229                           s_op->u.allreduce.buf_sz);
230                    gossip_debug(GOSSIP_IO_DEBUG, "%s: type=%s: result=%f\n",
231                                 __func__,
232                                 (send_recv_op->u.send_recv.type==0?"SEND":"RECV"),
233                                 a[0]);
234                }
235                break;
236            default:
237                break;
238            } /* end switch() */
239        } /* end if() */
240        free(send_recv_op);
241    }
242       
243    gossip_debug(GOSSIP_IO_DEBUG, "current_depth=%d, tree_depth=%d\n",
244                 s_op->u.allreduce.current_depth,
245                 s_op->u.allreduce.tree_depth);
246    if(s_op->u.allreduce.current_depth < s_op->u.allreduce.tree_depth)
247        js_p->error_code = CONTINUE_ALLREDUCE;
248    else
249        js_p->error_code = 0;
250
251    return SM_ACTION_COMPLETE;
252}
253
254/*
255 * Local variables:
256 *  mode: c
257 *  c-indent-level: 4
258 *  c-basic-offset: 4
259 * End:
260 *
261 * vim: ft=c ts=8 sts=4 sw=4 expandtab
262 */
Note: See TracBrowser for help on using the browser.