root/branches/windows-client/src/io/bmi/bmi_wintcp/socket-collection.c @ 8556

Revision 8556, 11.2 KB (checked in by sampson, 3 years ago)

Porting BMI sockets

  • Property svn:executable set to *
Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/*
8 * this is an implementation of a socket collection library.  It can be
9 * used to maintain a dynamic list of sockets and perform polling
10 * operations.
11 */
12
13/*
14 * NOTE:  I am making read bits implicit in the implementation.  A poll
15 * will always check to see if there is data to be read on a socket.
16 */
17#include <stdio.h>
18#include <string.h>
19#include <errno.h>
20
21#include "gossip.h"
22#include "socket-collection.h"
23#include "bmi-method-support.h"
24#include "bmi-tcp-addressing.h"
25#include "gen-locks.h"
26
27/* errors that can occur on a poll socket */
28#define ERRMASK (POLLERR+POLLHUP+POLLNVAL)
29
30#define POLLFD_ARRAY_START 32
31#define POLLFD_ARRAY_INC 32
32
33/* socket_collection_init()
34 *
35 * creates a new socket collection.  It also acquires the server socket
36 * from the caller if it is available.  Passing in a negative value
37 * indicates that this is being used on a client node and there is no
38 * server socket.
39 *
40 * returns a pointer to the collection on success, NULL on failure.
41 */
42socket_collection_p BMI_socket_collection_init(int new_server_socket)
43{
44
45    socket_collection_p tmp_scp = NULL;
46
47    tmp_scp = (struct socket_collection*) malloc(sizeof(struct
48        socket_collection));
49    if(!tmp_scp)
50    {
51        return(NULL);
52    }
53
54    memset(tmp_scp, 0, sizeof(struct socket_collection));
55
56    gen_mutex_init(&tmp_scp->queue_mutex);
57
58    tmp_scp->pollfd_array = (struct
59        pollfd*)malloc(POLLFD_ARRAY_START*sizeof(struct pollfd));
60    if(!tmp_scp->pollfd_array)
61    {
62        free(tmp_scp);
63        return(NULL);
64    }
65    tmp_scp->addr_array =
66        (bmi_method_addr_p*)malloc(POLLFD_ARRAY_START*sizeof(bmi_method_addr_p));
67    if(!tmp_scp->addr_array)
68    {
69        free(tmp_scp->pollfd_array);
70        free(tmp_scp);
71        return NULL;
72    }
73    if (pipe(tmp_scp->pipe_fd) < 0)
74    {
75        perror("pipe failed:");
76        free(tmp_scp->addr_array);
77        free(tmp_scp->pollfd_array);
78        free(tmp_scp);
79        return NULL;
80    }
81
82    tmp_scp->array_max = POLLFD_ARRAY_START;
83    tmp_scp->array_count = 0;
84    INIT_QLIST_HEAD(&tmp_scp->remove_queue);
85    INIT_QLIST_HEAD(&tmp_scp->add_queue);
86    tmp_scp->server_socket = new_server_socket;
87
88    if(new_server_socket > -1)
89    {
90        tmp_scp->pollfd_array[tmp_scp->array_count].fd = new_server_socket;
91        tmp_scp->pollfd_array[tmp_scp->array_count].events = POLLIN;
92        tmp_scp->addr_array[tmp_scp->array_count] = NULL;
93        tmp_scp->array_count++;
94    }
95
96    /* Add the pipe_fd[0] fd to the poll in set always */
97    tmp_scp->pollfd_array[tmp_scp->array_count].fd = tmp_scp->pipe_fd[0];
98    tmp_scp->pollfd_array[tmp_scp->array_count].events = POLLIN;
99    tmp_scp->addr_array[tmp_scp->array_count] = NULL;
100    tmp_scp->array_count++;
101
102    return (tmp_scp);
103}
104
105/* socket_collection_queue()
106 *
107 * queues a tcp method_addr for addition or removal from the collection.
108 *
109 * returns 0 on success, -errno on failure.
110 */
111void BMI_socket_collection_queue(socket_collection_p scp,
112                           bmi_method_addr_p map, struct qlist_head* queue)
113{
114    struct qlist_head* iterator = NULL;
115    struct qlist_head* scratch = NULL;
116    struct tcp_addr* tcp_addr_data = NULL;
117
118    /* make sure that this address isn't already slated for addition/removal */
119    qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
120    {
121        tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
122        if(tcp_addr_data->map == map)
123        {
124            qlist_del(&tcp_addr_data->sc_link);
125            break;
126        }
127    }
128    qlist_for_each_safe(iterator, scratch, &scp->add_queue)
129    {
130        tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
131        if(tcp_addr_data->map == map)
132        {
133            qlist_del(&tcp_addr_data->sc_link);
134            break;
135        }
136    }
137
138    /* add it on to the appropriate queue */
139    tcp_addr_data = map->method_data;
140    /* add to head, we are likely to access it again soon */
141    qlist_add(&tcp_addr_data->sc_link, queue);
142
143    return;
144}
145
146
147/* socket_collection_finalize()
148 *
149 * destroys a socket collection.  IMPORTANT:  It DOES NOT destroy the
150 * addresses contained within the collection, nor does it terminate
151 * connections.  This must be handled elsewhere.
152 *
153 * no return values.
154 */
155void BMI_socket_collection_finalize(socket_collection_p scp)
156{
157    free(scp->pollfd_array);
158    free(scp->addr_array);
159    free(scp);
160    return;
161}
162
163/* socket_collection_testglobal()
164 *
165 * this function is used to poll to see if any of the new sockets are
166 * available for work.  The array of method addresses and array of
167 * status fields must be passed into the function by the caller.
168 * incount specifies the size of these arrays.  outcount
169 * specifies the number of ready addresses.
170 *
171 * returns 0 on success, -errno on failure.
172 */
173int BMI_socket_collection_testglobal(socket_collection_p scp,
174                                 int incount,
175                                 int *outcount,
176                                 bmi_method_addr_p * maps,
177                                 int * status,
178                                 int poll_timeout)
179{
180    struct qlist_head* iterator = NULL;
181    struct qlist_head* scratch = NULL;
182    struct tcp_addr* tcp_addr_data = NULL;
183    struct tcp_addr* shifted_tcp_addr_data = NULL;
184    struct pollfd* tmp_pollfd_array = NULL;
185    bmi_method_addr_p* tmp_addr_array = NULL;
186    int ret = -1;
187    int old_errno;
188    int tmp_count;
189    int i;
190    int skip_flag;
191    int pipe_notify = 0;
192    struct timeval start, end;
193    int allowed_poll_time = poll_timeout;
194
195    gettimeofday(&start, NULL);
196do_again:
197    /* init the outgoing arguments for safety */
198    *outcount = 0;
199    memset(maps, 0, (sizeof(bmi_method_addr_p) * incount));
200    memset(status, 0, (sizeof(int) * incount));
201
202    gen_mutex_lock(&scp->queue_mutex);
203
204    /* look for addresses slated for removal */
205    qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
206    {
207        tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
208        qlist_del(&tcp_addr_data->sc_link);
209        /* take out of poll array, shift last entry into its place */
210        if(tcp_addr_data->sc_index > -1)
211        {
212            scp->pollfd_array[tcp_addr_data->sc_index] =
213                scp->pollfd_array[scp->array_count-1];
214            scp->addr_array[tcp_addr_data->sc_index] =
215                scp->addr_array[scp->array_count-1];
216            shifted_tcp_addr_data =
217                scp->addr_array[tcp_addr_data->sc_index]->method_data;
218            shifted_tcp_addr_data->sc_index = tcp_addr_data->sc_index;
219            scp->array_count--;
220            tcp_addr_data->sc_index = -1;
221            tcp_addr_data->write_ref_count = 0;
222        }
223    }
224
225    /* look for addresses slated for addition */
226    qlist_for_each_safe(iterator, scratch, &scp->add_queue)
227    {
228        tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
229        qlist_del(&tcp_addr_data->sc_link);
230        if(tcp_addr_data->sc_index > -1)
231        {
232            /* update existing entry */
233#if 0
234            gossip_err("HELLO: updating addr: %p, index: %d, ref: %d.\n",
235                scp->addr_array[tcp_addr_data->sc_index],
236                tcp_addr_data->sc_index,
237                tcp_addr_data->write_ref_count);
238#endif
239            scp->pollfd_array[tcp_addr_data->sc_index].events = POLLIN;
240            if(tcp_addr_data->write_ref_count > 0)
241                scp->pollfd_array[tcp_addr_data->sc_index].events |= POLLOUT;
242        }
243        else
244        {
245            /* new entry */
246            if(scp->array_count == scp->array_max)
247            {
248                /* we must enlarge the poll arrays */
249                tmp_pollfd_array = (struct pollfd*)malloc(
250                    (scp->array_max+POLLFD_ARRAY_INC)*sizeof(struct pollfd));
251                /* TODO: handle this */
252                assert(tmp_pollfd_array);
253                tmp_addr_array = (bmi_method_addr_p*)malloc(
254                    (scp->array_max+POLLFD_ARRAY_INC)*sizeof(bmi_method_addr_p));
255                /* TODO: handle this */
256                assert(tmp_addr_array);
257                memcpy(tmp_pollfd_array, scp->pollfd_array,
258                    scp->array_max*sizeof(struct pollfd));
259                free(scp->pollfd_array);
260                scp->pollfd_array = tmp_pollfd_array;
261                memcpy(tmp_addr_array, scp->addr_array,
262                    scp->array_max*sizeof(bmi_method_addr_p));
263                free(scp->addr_array);
264                scp->addr_array = tmp_addr_array;
265                scp->array_max = scp->array_max+POLLFD_ARRAY_INC;
266            }
267            /* add into pollfd array */
268            tcp_addr_data->sc_index = scp->array_count;
269            scp->array_count++;
270            scp->addr_array[tcp_addr_data->sc_index] = tcp_addr_data->map;
271            scp->pollfd_array[tcp_addr_data->sc_index].fd =
272                tcp_addr_data->socket;
273            scp->pollfd_array[tcp_addr_data->sc_index].events = POLLIN;
274            if(tcp_addr_data->write_ref_count > 0)
275                scp->pollfd_array[tcp_addr_data->sc_index].events |= POLLOUT;
276        }
277    }
278    gen_mutex_unlock(&scp->queue_mutex);
279
280    /* actually do the poll() work */
281    do
282    {
283        ret = poll(scp->pollfd_array, scp->array_count, allowed_poll_time);
284    } while(ret < 0 && errno == EINTR);
285    old_errno = errno;
286
287    if(ret < 0)
288    {
289        return(bmi_tcp_errno_to_pvfs(-old_errno));
290    }
291
292    /* nothing ready, just return */
293    if(ret == 0)
294    {
295        return(0);
296    }
297
298    tmp_count = ret;
299
300    for(i=0; i<scp->array_count; i++)
301    {
302        /* short out if we hit count limit */
303        if(*outcount == incount || *outcount == tmp_count)
304        {
305            break;
306        }
307        /* make sure we dont return the pipe fd as being ready */
308        if (scp->pollfd_array[i].fd == scp->pipe_fd[0])
309        {
310            if (scp->pollfd_array[i].revents) {
311                char c;
312                /* drain the pipe */
313                read(scp->pipe_fd[0], &c, 1);
314                pipe_notify = 1;
315            }
316            continue;
317        }
318        /* anything ready on this socket? */
319        if (scp->pollfd_array[i].revents)
320        {
321            skip_flag = 0;
322
323            /* make sure that this addr hasn't been removed */
324            gen_mutex_lock(&scp->queue_mutex);
325            qlist_for_each_safe(iterator, scratch, &scp->remove_queue)
326            {
327                tcp_addr_data = qlist_entry(iterator, struct tcp_addr, sc_link);
328                if(tcp_addr_data->map == scp->addr_array[i])
329                {
330                    skip_flag = 1;
331                    break;
332                }
333            }
334            gen_mutex_unlock(&scp->queue_mutex);
335            if(skip_flag)
336                continue;
337
338            if(scp->pollfd_array[i].revents & ERRMASK)
339                status[*outcount] |= SC_ERROR_BIT;
340            if(scp->pollfd_array[i].revents & POLLIN)
341                status[*outcount] |= SC_READ_BIT;
342            if(scp->pollfd_array[i].revents & POLLOUT)
343                status[*outcount] |= SC_WRITE_BIT;
344
345            if(scp->addr_array[i] == NULL)
346            {
347                /* server socket */
348                maps[*outcount] = alloc_tcp_method_addr();
349                /* TODO: handle this */
350                assert(maps[*outcount]);
351                tcp_addr_data = (maps[*outcount])->method_data;
352                tcp_addr_data->server_port = 1;
353                tcp_addr_data->socket = scp->server_socket;
354                tcp_addr_data->port = -1;
355            }
356            else
357            {
358                /* normal case */
359                maps[*outcount] = scp->addr_array[i];
360            }
361
362            *outcount = (*outcount) + 1;
363        }
364    }
365
366    /* Under the following conditions (i.e. all of them must be true) we go back to redoing poll
367     * a) There were no outstanding sockets/fds that had data
368     * b) There was a pipe notification that our socket sets have changed
369     * c) we havent exhausted our allotted time
370     */
371    if (*outcount == 0 && pipe_notify == 1)
372    {
373        gettimeofday(&end, NULL);
374        timersub(&end, &start, &end);
375        allowed_poll_time -= (end.tv_sec * 1000 + end.tv_usec/1000);
376        if (allowed_poll_time > 0)
377            goto do_again;
378    }
379
380    return (0);
381}
382
383
384/*
385 * Local variables:
386 *  c-indent-level: 4
387 *  c-basic-offset: 4
388 * End:
389 *
390 * vim: ts=8 sts=4 sw=4 expandtab
391 */
Note: See TracBrowser for help on using the browser.