root/branches/windows-client/src/io/bmi/bmi_wintcp/socket-collection.c @ 8560

Revision 8560, 12.0 KB (checked in by sampson, 3 years ago)

Porting BMI TCP

  • Property svn:executable set to *
Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/*
8 * this is an implementation of a socket collection library.  It can be
9 * used to maintain a dynamic list of sockets and perform polling
10 * operations.
11 */
12
13/*
14 * NOTE:  I am making read bits implicit in the implementation.  A poll
15 * will always check to see if there is data to be read on a socket.
16 */
17
18#include <WinSock2.h>
19
20#include <stdio.h>
21#include <string.h>
22#include <errno.h>
23#include <io.h>
24
25#include "gossip.h"
26#include "socket-collection.h"
27#include "bmi-method-support.h"
28#include "bmi-tcp-addressing.h"
29#include "gen-locks.h"
30
31/* errors that can occur on a poll socket */
32#define ERRMASK (POLLERR+POLLHUP+POLLNVAL)
33
34#define POLLFD_ARRAY_START 32
35#define POLLFD_ARRAY_INC 32
36
37/* socket_collection_init()
38 *
39 * creates a new socket collection.  It also acquires the server socket
40 * from the caller if it is available.  Passing in a negative value
41 * indicates that this is being used on a client node and there is no
42 * server socket.
43 *
44 * returns a pointer to the collection on success, NULL on failure.
45 */
46socket_collection_p BMI_socket_collection_init(int new_server_socket)
47{
48
49    socket_collection_p tmp_scp = NULL;
50
51    tmp_scp = (struct socket_collection*) malloc(sizeof(struct
52        socket_collection));
53    if(!tmp_scp)
54    {
55        return(NULL);
56    }
57
58    memset(tmp_scp, 0, sizeof(struct socket_collection));
59
60    gen_mutex_init(&tmp_scp->queue_mutex);
61
62    tmp_scp->pollfd_array = (struct
63        pollfd*)malloc(POLLFD_ARRAY_START*sizeof(WSAPOLLFD));
64   
65    tmp_scp->addr_array =
66        (bmi_method_addr_p*)malloc(POLLFD_ARRAY_START*sizeof(bmi_method_addr_p));
67    if(!tmp_scp->addr_array)
68    {
69        free(tmp_scp->pollfd_array);
70        free(tmp_scp);
71        return NULL;
72    }
73    /*if (pipe(tmp_scp->pipe_fd) < 0)*/
74    if (!CreatePipe(&(tmp_scp->pipe_fd[0]),
75                    &(tmp_scp->pipe_fd[1]),
76                    NULL, 0))
77    {
78        perror("pipe failed:");
79        BMI_socket_collection_finalize(tmp_scp);
80        return NULL;
81    }
82
83    tmp_scp->array_max = POLLFD_ARRAY_START;   
84    tmp_scp->array_count = 0;
85    INIT_QLIST_HEAD(&tmp_scp->remove_queue);
86    INIT_QLIST_HEAD(&tmp_scp->add_queue);
87    tmp_scp->server_socket = new_server_socket;
88
89    if(new_server_socket > -1)
90    {
91        tmp_scp->pollfd_array[tmp_scp->array_count].fd = new_server_socket;
92        tmp_scp->pollfd_array[tmp_scp->array_count].events = POLLIN;
93        tmp_scp->addr_array[tmp_scp->array_count] = NULL;
94        tmp_scp->array_count++;
95    }
96
97    /* Add the pipe_fd[0] fd to the poll in set always */
98    /* -- must be handled separately on Windows
99    tmp_scp->pollfd_array[tmp_scp->array_count].fd = tmp_scp->pipe_fd[0];
100    tmp_scp->pollfd_array[tmp_scp->array_count].events = POLLIN;
101    tmp_scp->addr_array[tmp_scp->array_count] = NULL;
102    tmp_scp->array_count++;   
103    */
104
105    return (tmp_scp);
106}
107
108/* socket_collection_queue()
109 *
110 * queues a tcp method_addr for addition or removal from the collection.
111 *
112 * returns 0 on success, -errno on failure.
113 */
114void BMI_socket_collection_queue(socket_collection_p scp,
115                           bmi_method_addr_p map, struct qlist_head* queue)
116{
117    struct qlist_head* iterator = NULL;
118    struct qlist_head* scratch = NULL;
119    struct tcp_addr* tcp_addr_data = NULL;
120
121    /* make sure that this address isn't already slated for addition/removal */
122    qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
123    {
124        tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
125        if(tcp_addr_data->map == map)
126        {
127            qlist_del(&tcp_addr_data->sc_link);
128            break;
129        }
130    }
131    qlist_for_each_safe(iterator, scratch, &scp->add_queue)
132    {
133        tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
134        if(tcp_addr_data->map == map)
135        {
136            qlist_del(&tcp_addr_data->sc_link);
137            break;
138        }
139    }
140
141    /* add it on to the appropriate queue */
142    tcp_addr_data = map->method_data;
143    /* add to head, we are likely to access it again soon */
144    qlist_add(&tcp_addr_data->sc_link, queue);
145
146    return;
147}
148
149
150/* socket_collection_finalize()
151 *
152 * destroys a socket collection.  IMPORTANT:  It DOES NOT destroy the
153 * addresses contained within the collection, nor does it terminate
154 * connections.  This must be handled elsewhere.
155 *
156 * no return values.
157 */
158void BMI_socket_collection_finalize(socket_collection_p scp)
159{
160    free(scp->addr_array);
161    free(scp->pollfd_array);
162    free(scp);
163    return;
164}
165
166/* socket_collection_testglobal()
167 *
168 * this function is used to poll to see if any of the new sockets are
169 * available for work.  The array of method addresses and array of
170 * status fields must be passed into the function by the caller.
171 * incount specifies the size of these arrays.  outcount
172 * specifies the number of ready addresses.
173 *
174 * returns 0 on success, -errno on failure.
175 */
176int BMI_socket_collection_testglobal(socket_collection_p scp,
177                                 int incount,
178                                 int *outcount,
179                                 bmi_method_addr_p * maps,
180                                 int * status,
181                                 int poll_timeout)
182{
183    struct qlist_head* iterator = NULL;
184    struct qlist_head* scratch = NULL;
185    struct tcp_addr* tcp_addr_data = NULL;
186    struct tcp_addr* shifted_tcp_addr_data = NULL;
187    WSAPOLLFD* tmp_pollfd_array = NULL;
188    bmi_method_addr_p* tmp_addr_array = NULL;
189    int ret = -1;
190    int old_errno;
191    int tmp_count;
192    int i;
193    int skip_flag;
194    int pipe_notify = 0;
195    struct timeval start, end;
196    int allowed_poll_time = poll_timeout;
197
198    gettimeofday(&start, NULL);
199do_again:
200    /* init the outgoing arguments for safety */
201    *outcount = 0;
202    memset(maps, 0, (sizeof(bmi_method_addr_p) * incount));
203    memset(status, 0, (sizeof(int) * incount));
204
205    gen_mutex_lock(&scp->queue_mutex);
206
207    /* look for addresses slated for removal */
208    qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
209    {
210        tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
211        qlist_del(&tcp_addr_data->sc_link);
212        /* take out of poll array, shift last entry into its place */
213        if(tcp_addr_data->sc_index > -1)
214        {
215            scp->pollfd_array[tcp_addr_data->sc_index] =
216                scp->pollfd_array[scp->array_count-1];
217            scp->addr_array[tcp_addr_data->sc_index] =
218                scp->addr_array[scp->array_count-1];
219            shifted_tcp_addr_data =
220                scp->addr_array[tcp_addr_data->sc_index]->method_data;
221            shifted_tcp_addr_data->sc_index = tcp_addr_data->sc_index;
222            scp->array_count--;
223            tcp_addr_data->sc_index = -1;
224            tcp_addr_data->write_ref_count = 0;
225        }
226    }
227
228    /* look for addresses slated for addition */
229    qlist_for_each_safe(iterator, scratch, &scp->add_queue)
230    {
231        tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
232        qlist_del(&tcp_addr_data->sc_link);
233        if(tcp_addr_data->sc_index > -1)
234        {
235            /* update existing entry */
236#if 0
237            gossip_err("HELLO: updating addr: %p, index: %d, ref: %d.\n",
238                scp->addr_array[tcp_addr_data->sc_index],
239                tcp_addr_data->sc_index,
240                tcp_addr_data->write_ref_count);
241#endif
242            scp->pollfd_array[tcp_addr_data->sc_index].events = POLLIN;
243            if(tcp_addr_data->write_ref_count > 0)
244                scp->pollfd_array[tcp_addr_data->sc_index].events |= POLLOUT;
245        }
246        else
247        {
248            /* new entry */
249            if(scp->array_count == scp->array_max)
250            {
251                /* we must enlarge the poll arrays */
252                tmp_pollfd_array = (WSAPOLLFD*)malloc(
253                    (scp->array_max+POLLFD_ARRAY_INC)*sizeof(WSAPOLLFD));
254                /* TODO: handle this */
255                assert(tmp_pollfd_array);
256                tmp_addr_array = (bmi_method_addr_p*)malloc(
257                    (scp->array_max+POLLFD_ARRAY_INC)*sizeof(bmi_method_addr_p));
258                /* TODO: handle this */
259                assert(tmp_addr_array);
260                memcpy(tmp_pollfd_array, scp->pollfd_array,
261                    scp->array_max*sizeof(WSAPOLLFD));
262                free(scp->pollfd_array);
263                scp->pollfd_array = tmp_pollfd_array;
264                memcpy(tmp_addr_array, scp->addr_array,
265                    scp->array_max*sizeof(bmi_method_addr_p));
266                free(scp->addr_array);
267                scp->addr_array = tmp_addr_array;
268                scp->array_max = scp->array_max+POLLFD_ARRAY_INC;
269            }
270            /* add into pollfd array */
271            tcp_addr_data->sc_index = scp->array_count;
272            scp->array_count++;
273            scp->addr_array[tcp_addr_data->sc_index] = tcp_addr_data->map;
274            scp->pollfd_array[tcp_addr_data->sc_index].fd =
275                tcp_addr_data->socket;
276            scp->pollfd_array[tcp_addr_data->sc_index].events = POLLIN;
277            if(tcp_addr_data->write_ref_count > 0)
278                scp->pollfd_array[tcp_addr_data->sc_index].events |= POLLOUT;
279        }
280    }
281    gen_mutex_unlock(&scp->queue_mutex);
282
283    /* actually do the poll() work */
284    do
285    {
286        DWORD bytes;
287
288        /* poll for 1ms */
289        ret = WSAPoll(scp->pollfd_array, scp->array_count, 1);
290        old_errno = WSAGetLastError();
291        allowed_poll_time--;
292
293        if (ret == 0)
294        {
295            /* check our pipe */
296            if (PeekNamedPipe(scp->pipe_fd[0], NULL, 0, NULL, &bytes, NULL))
297            {
298                if (bytes)
299                {
300                    pipe_notify = 1;
301                }
302            }
303            else
304            {
305                ret = -1;
306            }
307        }
308    } while(ret == 0 && !pipe_notify && allowed_poll_time > 0);   
309
310    if(ret < 0)
311    {
312        return(bmi_tcp_errno_to_pvfs(-old_errno));
313    }
314
315    /* nothing ready, just return */
316    if(ret == 0)
317    {
318        return(0);
319    }
320
321    if (pipe_notify)
322    {
323        char c;
324        DWORD count;
325        /* drain the pipe */
326        ReadFile(scp->pipe_fd[0], &c, 1, &count, NULL);
327    }
328
329    tmp_count = ret;
330
331    for(i=0; i<scp->array_count; i++)
332    {
333        /* short out if we hit count limit */
334        if(*outcount == incount || *outcount == tmp_count)
335        {
336            break;
337        }
338        /* make sure we dont return the pipe fd as being ready */
339        /* if (scp->pollfd_array[i].fd == scp->pipe_fd[0])
340        {
341            if (scp->pollfd_array[i].revents) {
342                char c;
343                /* drain the pipe */
344        /*        _read(scp->pipe_fd[0], &c, 1);
345                pipe_notify = 1;
346            }
347            continue;
348        } */
349        /* anything ready on this socket? */
350        if (scp->pollfd_array[i].revents)
351        {
352            skip_flag = 0;
353
354            /* make sure that this addr hasn't been removed */
355            gen_mutex_lock(&scp->queue_mutex);
356            qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
357            {
358                tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
359                if(tcp_addr_data->map == scp->addr_array[i])
360                {
361                    skip_flag = 1;
362                    break;
363                }
364            }
365            gen_mutex_unlock(&scp->queue_mutex);
366            if(skip_flag)
367                continue;
368
369            if(scp->pollfd_array[i].revents & ERRMASK)
370                status[*outcount] |= SC_ERROR_BIT;
371            if(scp->pollfd_array[i].revents & POLLIN)
372                status[*outcount] |= SC_READ_BIT;
373            if(scp->pollfd_array[i].revents & POLLOUT)
374                status[*outcount] |= SC_WRITE_BIT;
375
376            if(scp->addr_array[i] == NULL)
377            {
378                /* server socket */
379                maps[*outcount] = alloc_tcp_method_addr();
380                /* TODO: handle this */
381                assert(maps[*outcount]);
382                tcp_addr_data = (maps[*outcount])->method_data;
383                tcp_addr_data->server_port = 1;
384                tcp_addr_data->socket = scp->server_socket;
385                tcp_addr_data->port = -1;
386            }
387            else
388            {
389                /* normal case */
390                maps[*outcount] = scp->addr_array[i];
391            }
392
393            *outcount = (*outcount) + 1;
394        }
395    }
396
397    /* Under the following conditions (i.e. all of them must be true) we go back to redoing poll
398     * a) There were no outstanding sockets/fds that had data
399     * b) There was a pipe notification that our socket sets have changed
400     * c) we havent exhausted our allotted time
401     */
402    if (*outcount == 0 && pipe_notify == 1)
403    {
404        gettimeofday(&end, NULL);
405        timersub(&end, &start, &end);
406        allowed_poll_time -= (end.tv_sec * 1000 + end.tv_usec/1000);
407        if (allowed_poll_time > 0)
408            goto do_again;
409    }
410
411    return (0);
412}
413
414
415/*
416 * Local variables:
417 *  c-indent-level: 4
418 *  c-basic-offset: 4
419 * End:
420 *
421 * vim: ts=8 sts=4 sw=4 expandtab
422 */
Note: See TracBrowser for help on using the browser.