root/branches/windows-client/src/io/bmi/bmi_wintcp/bmi-wintcp.c @ 8560

Revision 8560, 131.7 KB (checked in by sampson, 3 years ago)

Porting BMI TCP

  • Property svn:executable set to *
Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/* Windows TCP/IP implementation of a BMI method */
8
9#include <WinSock2.h>
10
11#include <errno.h>
12#include <string.h>
13#include <io.h>
14//#include <unistd.h>
15#include <fcntl.h>
16//#include <sys/poll.h>
17//#include <netinet/tcp.h>
18#include <assert.h>
19//#include <sys/uio.h>
20#include <time.h>
21#include <stdint.h>
22//#include <sys/time.h>
23//#include <sys/socket.h>
24//#include <netinet/in.h>
25//#include <arpa/inet.h>
26#include "pint-mem.h"
27
28#include "pvfs2-config.h"
29#ifdef HAVE_NETDB_H
30#include <netdb.h>
31#endif
32
33#include "bmi-method-support.h"
34#include "bmi-method-callback.h"
35#include "bmi-tcp-addressing.h"
36#ifdef __PVFS2_USE_EPOLL__
37#include "socket-collection-epoll.h"
38#else
39#include "socket-collection.h"
40#endif
41#include "op-list.h"
42#include "gossip.h"
43#include "sockio.h"
44#include "bmi-byteswap.h"
45#include "id-generator.h"
46#include "pint-event.h"
47#include "pvfs2-debug.h"
48#ifdef USE_TRUSTED
49#include "server-config.h"
50#include "bmi-tcp-addressing.h"
51#endif
52#include "gen-locks.h"
53#include "pint-hint.h"
54
55static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
56static gen_cond_t interface_cond = GEN_COND_INITIALIZER;
57static int sc_test_busy = 0;
58
59/***  Windows-specific additions ***/
60typedef unsigned int socklen_t;
61
62/* Windows Sockets doesn't have inet_aton */
63int inet_aton(const char *cp, struct in_addr *inp)
64{
65    unsigned long addr;
66
67    if (cp == NULL || strlen(cp) == 0 || inp == NULL)
68    {
69        return (0);
70    }
71   
72    /* handle 255.255.255.255 separately */
73    if (strcmp(cp, "255.255.255.255") == 0)
74    {
75        inp->S_un.S_addr = 0xFFFFFFFF;
76        return (1);
77    }
78
79    /* use inet_addr for other addresses */
80    addr = inet_addr(cp);
81    if (addr == INADDR_NONE)
82    {
83        return (0);
84    }
85
86    inp->S_un.S_addr = addr;
87
88    return (1);
89
90}
91/***********************************/
92
93/* function prototypes */
94int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
95                       int method_id,
96                       int init_flags);
97int BMI_tcp_finalize(void);
98int BMI_tcp_set_info(int option,
99                     void *inout_parameter);
100int BMI_tcp_get_info(int option,
101                     void *inout_parameter);
102void *BMI_tcp_memalloc(bmi_size_t size,
103                       enum bmi_op_type send_recv);
104int BMI_tcp_memfree(void *buffer,
105                    bmi_size_t size,
106                    enum bmi_op_type send_recv);
107int BMI_tcp_unexpected_free(void *buffer);
108int BMI_tcp_post_send(bmi_op_id_t * id,
109                      bmi_method_addr_p dest,
110                      const void *buffer,
111                      bmi_size_t size,
112                      enum bmi_buffer_type buffer_type,
113                      bmi_msg_tag_t tag,
114                      void *user_ptr,
115                      bmi_context_id context_id,
116                      PVFS_hint hints);
117int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
118                                bmi_method_addr_p dest,
119                                const void *buffer,
120                                bmi_size_t size,
121                                enum bmi_buffer_type buffer_type,
122                                bmi_msg_tag_t tag,
123                                void *user_ptr,
124                                bmi_context_id context_id,
125                                PVFS_hint hints);
126int BMI_tcp_post_recv(bmi_op_id_t * id,
127                      bmi_method_addr_p src,
128                      void *buffer,
129                      bmi_size_t expected_size,
130                      bmi_size_t * actual_size,
131                      enum bmi_buffer_type buffer_type,
132                      bmi_msg_tag_t tag,
133                      void *user_ptr,
134                      bmi_context_id context_id,
135                      PVFS_hint hints);
136int BMI_tcp_test(bmi_op_id_t id,
137                 int *outcount,
138                 bmi_error_code_t * error_code,
139                 bmi_size_t * actual_size,
140                 void **user_ptr,
141                 int max_idle_time_ms,
142                 bmi_context_id context_id);
143int BMI_tcp_testsome(int incount,
144                     bmi_op_id_t * id_array,
145                     int *outcount,
146                     int *index_array,
147                     bmi_error_code_t * error_code_array,
148                     bmi_size_t * actual_size_array,
149                     void **user_ptr_array,
150                     int max_idle_time_ms,
151                     bmi_context_id context_id);
152int BMI_tcp_testunexpected(int incount,
153                           int *outcount,
154                           struct bmi_method_unexpected_info *info,
155                           int max_idle_time_ms);
156int BMI_tcp_testcontext(int incount,
157                     bmi_op_id_t * out_id_array,
158                     int *outcount,
159                     bmi_error_code_t * error_code_array,
160                     bmi_size_t * actual_size_array,
161                     void **user_ptr_array,
162                     int max_idle_time_ms,
163                     bmi_context_id context_id);
164bmi_method_addr_p BMI_tcp_method_addr_lookup(const char *id_string);
165const char* BMI_tcp_addr_rev_lookup_unexpected(bmi_method_addr_p map);
166int BMI_tcp_query_addr_range(bmi_method_addr_p, const char *, int);
167int BMI_tcp_post_send_list(bmi_op_id_t * id,
168                           bmi_method_addr_p dest,
169                           const void *const *buffer_list,
170                           const bmi_size_t *size_list,
171                           int list_count,
172                           bmi_size_t total_size,
173                           enum bmi_buffer_type buffer_type,
174                           bmi_msg_tag_t tag,
175                           void *user_ptr,
176                           bmi_context_id context_id,
177                           PVFS_hint hints);
178int BMI_tcp_post_recv_list(bmi_op_id_t * id,
179                           bmi_method_addr_p src,
180                           void *const *buffer_list,
181                           const bmi_size_t *size_list,
182                           int list_count,
183                           bmi_size_t total_expected_size,
184                           bmi_size_t * total_actual_size,
185                           enum bmi_buffer_type buffer_type,
186                           bmi_msg_tag_t tag,
187                           void *user_ptr,
188                           bmi_context_id context_id,
189                           PVFS_hint hints);
190int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
191                                     bmi_method_addr_p dest,
192                                     const void *const *buffer_list,
193                                     const bmi_size_t *size_list,
194                                     int list_count,
195                                     bmi_size_t total_size,
196                                     enum bmi_buffer_type buffer_type,
197                                     bmi_msg_tag_t tag,
198                                     void *user_ptr,
199                                     bmi_context_id context_id,
200                                     PVFS_hint hints);
201int BMI_tcp_open_context(bmi_context_id context_id);
202void BMI_tcp_close_context(bmi_context_id context_id);
203int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id);
204
205char BMI_tcp_method_name[] = "bmi_tcp";
206
207/* size of encoded message header */
208#define TCP_ENC_HDR_SIZE 24
209
210/* structure internal to tcp for use as a message header */
211struct tcp_msg_header
212{
213    uint32_t magic_nr;          /* magic number */
214    uint32_t mode;                /* eager, rendezvous, etc. */
215    bmi_msg_tag_t tag;                /* user specified message tag */
216    bmi_size_t size;                /* length of trailing message */
217    char enc_hdr[TCP_ENC_HDR_SIZE];  /* encoded version of header info */
218};
219
220#define BMI_TCP_ENC_HDR(hdr)                                                \
221    do {                                                                \
222        *((uint32_t*)&((hdr).enc_hdr[0])) = htobmi32((hdr).magic_nr);        \
223        *((uint32_t*)&((hdr).enc_hdr[4])) = htobmi32((hdr).mode);        \
224        *((uint64_t*)&((hdr).enc_hdr[8])) = htobmi64((hdr).tag);        \
225        *((uint64_t*)&((hdr).enc_hdr[16])) = htobmi64((hdr).size);        \
226    } while(0)                                                   
227
228#define BMI_TCP_DEC_HDR(hdr)                                                \
229    do {                                                                \
230        (hdr).magic_nr = bmitoh32(*((uint32_t*)&((hdr).enc_hdr[0])));        \
231        (hdr).mode = bmitoh32(*((uint32_t*)&((hdr).enc_hdr[4])));        \
232        (hdr).tag = bmitoh64(*((uint64_t*)&((hdr).enc_hdr[8])));        \
233        (hdr).size = bmitoh64(*((uint64_t*)&((hdr).enc_hdr[16])));        \
234    } while(0)                                                   
235
236/* enumerate states that we care about */
237enum bmi_tcp_state
238{
239    BMI_TCP_INPROGRESS,
240    BMI_TCP_BUFFERING,
241    BMI_TCP_COMPLETE
242};
243
244/* tcp private portion of operation structure */
245struct tcp_op
246{
247    struct tcp_msg_header env;        /* envelope for this message */
248    enum bmi_tcp_state tcp_op_state;
249    /* these two fields are used as place holders for the buffer
250     * list and size list when we really don't have lists (regular
251     * BMI_send or BMI_recv operations); it allows us to use
252     * generic code to handle both cases
253     */
254    void *buffer_list_stub;
255    bmi_size_t size_list_stub;
256};
257
258/* static io vector for use with readv and writev; we can only use
259 * this because BMI serializes module calls
260 */
261#define BMI_TCP_IOV_COUNT 10
262static WSABUF stat_io_vector[BMI_TCP_IOV_COUNT+1];
263
264/* internal utility functions */
265static int tcp_server_init(void);
266static void dealloc_tcp_method_addr(bmi_method_addr_p map);
267static int tcp_sock_init(bmi_method_addr_p my_method_addr);
268static int enqueue_operation(op_list_p target_list,
269                             enum bmi_op_type send_recv,
270                             bmi_method_addr_p map,
271                             void *const *buffer_list,
272                             const bmi_size_t *size_list,
273                             int list_count,
274                             bmi_size_t amt_complete,
275                             bmi_size_t env_amt_complete,
276                             bmi_op_id_t * id,
277                             int tcp_op_state,
278                             struct tcp_msg_header header,
279                             void *user_ptr,
280                             bmi_size_t actual_size,
281                             bmi_size_t expected_size,
282                             bmi_context_id context_id,
283                             int32_t event_id);
284static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code);
285static int tcp_shutdown_addr(bmi_method_addr_p map);
286static int tcp_do_work(int max_idle_time);
287static int tcp_do_work_error(bmi_method_addr_p map);
288static int tcp_do_work_recv(bmi_method_addr_p map, int* stall_flag);
289static int tcp_do_work_send(bmi_method_addr_p map, int* stall_flag);
290static int work_on_recv_op(method_op_p my_method_op,
291                           int *stall_flag);
292static int work_on_send_op(method_op_p my_method_op,
293                           int *blocked_flag, int* stall_flag);
294static int tcp_accept_init(int *socket, char** peer);
295static method_op_p alloc_tcp_method_op(void);
296static void dealloc_tcp_method_op(method_op_p old_op);
297static int handle_new_connection(bmi_method_addr_p map);
298static int tcp_post_send_generic(bmi_op_id_t * id,
299                                 bmi_method_addr_p dest,
300                                 const void *const *buffer_list,
301                                 const bmi_size_t *size_list,
302                                 int list_count,
303                                 enum bmi_buffer_type buffer_type,
304                                 struct tcp_msg_header my_header,
305                                 void *user_ptr,
306                                 bmi_context_id context_id,
307                                 PVFS_hint hints);
308static int tcp_post_recv_generic(bmi_op_id_t * id,
309                                 bmi_method_addr_p src,
310                                 void *const *buffer_list,
311                                 const bmi_size_t *size_list,
312                                 int list_count,
313                                 bmi_size_t expected_size,
314                                 bmi_size_t * actual_size,
315                                 enum bmi_buffer_type buffer_type,
316                                 bmi_msg_tag_t tag,
317                                 void *user_ptr,
318                                 bmi_context_id context_id,
319                                 PVFS_hint hints);
320static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
321    size_list, int list_count, bmi_size_t total_size, int* list_index,
322    bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
323    char* enc_hdr, bmi_size_t* env_amt_complete);
324
325#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
326static int tcp_enable_trusted(struct tcp_addr *tcp_addr_data);
327#endif
328#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
329static int tcp_allow_trusted(struct sockaddr_in *peer_sockaddr);
330#endif
331
332static void bmi_set_sock_buffers(int socket);
333
334/* exported method interface */
335const struct bmi_method_ops bmi_tcp_ops = {
336    BMI_tcp_method_name,
337    0, /* flags */
338    BMI_tcp_initialize,
339    BMI_tcp_finalize,
340    BMI_tcp_set_info,
341    BMI_tcp_get_info,
342    BMI_tcp_memalloc,
343    BMI_tcp_memfree,
344    BMI_tcp_unexpected_free,
345    BMI_tcp_post_send,
346    BMI_tcp_post_sendunexpected,
347    BMI_tcp_post_recv,
348    BMI_tcp_test,
349    BMI_tcp_testsome,
350    BMI_tcp_testcontext,
351    BMI_tcp_testunexpected,
352    BMI_tcp_method_addr_lookup,
353    BMI_tcp_post_send_list,
354    BMI_tcp_post_recv_list,
355    BMI_tcp_post_sendunexpected_list,
356    BMI_tcp_open_context,
357    BMI_tcp_close_context,
358    BMI_tcp_cancel,
359    BMI_tcp_addr_rev_lookup_unexpected,
360    BMI_tcp_query_addr_range
361};
362
363/* module parameters */
364static struct
365{
366    int method_flags;
367    int method_id;
368    bmi_method_addr_p listen_addr;
369} tcp_method_params;
370
371#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
372static struct tcp_allowed_connection_s *gtcp_allowed_connection = NULL;
373#endif
374
375static int check_unexpected = 1;
376
377/* op_list_array indices */
378enum
379{
380    NUM_INDICES = 5,
381    IND_SEND = 0,
382    IND_RECV = 1,
383    IND_RECV_INFLIGHT = 2,
384    IND_RECV_EAGER_DONE_BUFFERING = 3,
385    IND_COMPLETE_RECV_UNEXP = 4,        /* MAKE THIS COMES LAST */
386};
387
388/* internal operation lists */
389static op_list_p op_list_array[6] = { NULL, NULL, NULL, NULL,
390    NULL, NULL
391};
392
393/* internal completion queues */
394static op_list_p completion_array[BMI_MAX_CONTEXTS] = { NULL };
395
396/* internal socket collection */
397static socket_collection_p tcp_socket_collection_p = NULL;
398
399/* tunable parameters */
400enum
401{
402    /* amount of pending connections we'll allow */
403    TCP_BACKLOG = 256,
404    /* amount of work to be done during a test.  This roughly
405     * translates into the number of sockets that we will perform
406     * nonblocking operations on during one function call.
407     */
408    TCP_WORK_METRIC = 128
409};
410
411/* TCP message modes */
412enum
413{
414    TCP_MODE_IMMED = 1,                /* not used for TCP/IP */
415    TCP_MODE_UNEXP = 2,
416    TCP_MODE_EAGER = 4,
417    TCP_MODE_REND = 8
418};
419
420/* Allowable sizes for each mode */
421enum
422{
423    TCP_MODE_EAGER_LIMIT = 16384,        /* 16K */
424    TCP_MODE_REND_LIMIT = 16777216        /* 16M */
425};
426
427/* toggles cancel mode; for bmi_tcp this will result in socket being closed
428 * in all cancellation cases
429 */
430static int forceful_cancel_mode = 0;
431
432/*
433  Socket buffer sizes, currently these default values will be used
434  for the clients... (TODO)
435 */
436static int tcp_buffer_size_receive = 0;
437static int tcp_buffer_size_send = 0;
438
439static PINT_event_type bmi_tcp_send_event_id;
440static PINT_event_type bmi_tcp_recv_event_id;
441
442static PINT_event_group bmi_tcp_event_group;
443/* static pid_t bmi_tcp_pid */
444static HANDLE bmi_tcp_pid;
445
446/*************************************************************************
447 * Visible Interface
448 */
449
450/* BMI_tcp_initialize()
451 *
452 * Initializes the tcp method.  Must be called before any other tcp
453 * method functions.
454 *
455 * returns 0 on success, -errno on failure
456 */
457int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
458                       int method_id,
459                       int init_flags)
460{
461
462    int ret = -1, err;
463    int tmp_errno = bmi_tcp_errno_to_pvfs(-ENOSYS);
464    struct tcp_addr *tcp_addr_data = NULL;
465    int i = 0;
466    WORD version;
467    WSADATA wsaData;
468
469    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Initializing TCP/IP module.\n");
470
471    /* check args */
472    if ((init_flags & BMI_INIT_SERVER) && !listen_addr)
473    {
474        gossip_lerr("Error: bad parameters given to TCP/IP module.\n");
475        return (bmi_tcp_errno_to_pvfs(-EINVAL));
476    }
477
478    version = MAKEWORD(2, 2);
479    err = WSAStartup(version, &wsaData);
480    if (err != 0)
481    {
482        gossip_lerr("Error: could not initialize Windows Sockets: %d.\n", err);
483        return (bmi_tcp_errno_to_pvfs(-ENOSYS));
484    }
485
486    gen_mutex_lock(&interface_mutex);
487
488    /* zero out our parameter structure and fill it in */
489    memset(&tcp_method_params, 0, sizeof(tcp_method_params));
490    tcp_method_params.method_id = method_id;
491    tcp_method_params.method_flags = init_flags;
492
493    if (init_flags & BMI_INIT_SERVER)
494    {
495        /* hang on to our local listening address if needed */
496        tcp_method_params.listen_addr = listen_addr;
497        /* and initialize server functions */
498        ret = tcp_server_init();
499        if (ret < 0)
500        {
501            tmp_errno = bmi_tcp_errno_to_pvfs(ret);
502            gossip_err("Error: tcp_server_init() failure.\n");
503            goto initialize_failure;
504        }
505    }
506
507    /* set up the operation lists */
508    for (i = 0; i < NUM_INDICES; i++)
509    {
510        op_list_array[i] = op_list_new();
511        if (!op_list_array[i])
512        {
513            tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
514            goto initialize_failure;
515        }
516    }
517
518    /* set up the socket collection */
519    if (tcp_method_params.method_flags & BMI_INIT_SERVER)
520    {
521        tcp_addr_data = (struct tcp_addr *) tcp_method_params.listen_addr->method_data;
522        tcp_socket_collection_p = BMI_socket_collection_init(tcp_addr_data->socket);
523    }
524    else
525    {
526        tcp_socket_collection_p = BMI_socket_collection_init(-1);
527    }
528
529    if (!tcp_socket_collection_p)
530    {
531        tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
532        goto initialize_failure;
533    }
534
535    /* bmi_tcp_pid = getpid(); */
536    bmi_tcp_pid = GetCurrentProcess();
537    PINT_event_define_group("bmi_tcp", &bmi_tcp_event_group);
538
539    /* Define the send event:
540     *   START: (client_id, request_id, rank, handle, op_id, send_size)
541     *   STOP: (size_sent)
542     */
543    PINT_event_define_event(
544        &bmi_tcp_event_group,
545#ifdef __PVFS2_SERVER__
546        "bmi_server_send",
547#else
548        "bmi_client_send",
549#endif
550        "%d%d%d%llu%d%d",
551        "%d", &bmi_tcp_send_event_id);
552
553    /* Define the recv event:
554     *   START: (client_id, request_id, rank, handle, op_id, recv_size)
555     *   STOP: (size_received)
556     */
557    PINT_event_define_event(
558        &bmi_tcp_event_group,
559#ifdef __PVFS2_SERVER__
560        "bmi_server_recv",
561#else
562        "bmi_client_recv",
563#endif
564        "%d%d%d%llu%d%d",
565        "%d", &bmi_tcp_recv_event_id);
566
567    gen_mutex_unlock(&interface_mutex);
568    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
569                  "TCP/IP module successfully initialized.\n");
570    return (0);
571
572  initialize_failure:
573
574    /* cleanup data structures and bail out */
575    for (i = 0; i < NUM_INDICES; i++)
576    {
577        if (op_list_array[i])
578        {
579            op_list_cleanup(op_list_array[i]);
580        }
581    }
582    if (tcp_socket_collection_p)
583    {
584        BMI_socket_collection_finalize(tcp_socket_collection_p);
585    }
586    gen_mutex_unlock(&interface_mutex);
587    return (tmp_errno);
588}
589
590
591/* BMI_tcp_finalize()
592 *
593 * Shuts down the tcp method.
594 *
595 * returns 0 on success, -errno on failure
596 */
597int BMI_tcp_finalize(void)
598{
599    int i = 0;
600
601    gen_mutex_lock(&interface_mutex);
602
603    /* shut down our listen addr, if we have one */
604    if ((tcp_method_params.method_flags & BMI_INIT_SERVER)
605        && tcp_method_params.listen_addr)
606    {
607        dealloc_tcp_method_addr(tcp_method_params.listen_addr);
608    }
609
610    /* note that this forcefully shuts down operations */
611    for (i = 0; i < NUM_INDICES; i++)
612    {
613        if (op_list_array[i])
614        {
615            op_list_cleanup(op_list_array[i]);
616            op_list_array[i] = NULL;
617        }
618    }
619
620    /* get rid of socket collection */
621    if (tcp_socket_collection_p)
622    {
623        BMI_socket_collection_finalize(tcp_socket_collection_p);
624        tcp_socket_collection_p = NULL;
625    }
626
627    /* Windows Sockets finalize */
628    WSACleanup();
629
630    /* NOTE: we are trusting the calling BMI layer to deallocate
631     * all of the method addresses (this will close any open sockets)
632     */
633    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "TCP/IP module finalized.\n");
634    gen_mutex_unlock(&interface_mutex);
635    return (0);
636}
637
638
639/*
640 * BMI_tcp_method_addr_lookup()
641 *
642 * resolves the string representation of an address into a method
643 * address structure. 
644 *
645 * returns a pointer to method_addr on success, NULL on failure
646 */
647bmi_method_addr_p BMI_tcp_method_addr_lookup(const char *id_string)
648{
649    char *tcp_string = NULL;
650    char *delim = NULL;
651    char *hostname = NULL;
652    bmi_method_addr_p new_addr = NULL;
653    struct tcp_addr *tcp_addr_data = NULL;
654    int ret = -1;
655
656    tcp_string = string_key("tcp", id_string);
657    if (!tcp_string)
658    {
659        /* the string doesn't even have our info */
660        return (NULL);
661    }
662
663    /* start breaking up the method information */
664    /* for normal tcp, it is simply hostname:port */
665    if ((delim = strchr(tcp_string, ':')) == NULL)
666    {
667        gossip_lerr("Error: malformed tcp address.\n");
668        free(tcp_string);
669        return (NULL);
670    }
671
672    /* looks ok, so let's build the method addr structure */
673    new_addr = alloc_tcp_method_addr();
674    if (!new_addr)
675    {
676        free(tcp_string);
677        return (NULL);
678    }
679    tcp_addr_data = (struct tcp_addr *) new_addr->method_data;
680
681    ret = sscanf((delim + 1), "%d", &(tcp_addr_data->port));
682    if (ret != 1)
683    {
684        gossip_lerr("Error: malformed tcp address.\n");
685        dealloc_tcp_method_addr(new_addr);
686        free(tcp_string);
687        return (NULL);
688    }
689
690    hostname = (char *) malloc((delim - tcp_string + 1));
691    if (!hostname)
692    {
693        dealloc_tcp_method_addr(new_addr);
694        free(tcp_string);
695        return (NULL);
696    }
697    strncpy(hostname, tcp_string, (delim - tcp_string));
698    hostname[delim - tcp_string] = '\0';
699
700    tcp_addr_data->hostname = hostname;
701
702    free(tcp_string);
703    return (new_addr);
704}
705
706
707/* BMI_tcp_memalloc()
708 *
709 * Allocates memory that can be used in native mode by tcp.
710 *
711 * returns 0 on success, -errno on failure
712 */
713void *BMI_tcp_memalloc(bmi_size_t size,
714                       enum bmi_op_type send_recv)
715{
716    /* we really don't care what flags the caller uses, TCP/IP has no
717     * preferences about how the memory should be configured.
718     */
719
720/*    return (calloc(1,(size_t) size)); */
721    return PINT_mem_aligned_alloc(size, 4096);
722}
723
724
725/* BMI_tcp_memfree()
726 *
727 * Frees memory that was allocated with BMI_tcp_memalloc()
728 *
729 * returns 0 on success, -errno on failure
730 */
731int BMI_tcp_memfree(void *buffer,
732                    bmi_size_t size,
733                    enum bmi_op_type send_recv)
734{
735    PINT_mem_aligned_free(buffer);
736    return (0);
737}
738
739/* BMI_tcp_unexpected_free()
740 *
741 * Frees memory that was returned from BMI_tcp_test_unexpected()
742 *
743 * returns 0 on success, -errno on failure
744 */
745int BMI_tcp_unexpected_free(void *buffer)
746{
747    if (buffer)
748    {
749        free(buffer);
750    }
751    return (0);
752}
753
754#ifdef USE_TRUSTED
755
756static struct tcp_allowed_connection_s *
757alloc_trusted_connection_info(int network_count)
758{
759    struct tcp_allowed_connection_s *tcp_allowed_connection_info = NULL;
760
761    tcp_allowed_connection_info = (struct tcp_allowed_connection_s *)
762            calloc(1, sizeof(struct tcp_allowed_connection_s));
763    if (tcp_allowed_connection_info)
764    {
765        tcp_allowed_connection_info->network =
766            (struct in_addr *) calloc(network_count, sizeof(struct in_addr));
767        if (tcp_allowed_connection_info->network == NULL)
768        {
769            free(tcp_allowed_connection_info);
770            tcp_allowed_connection_info = NULL;
771        }
772        else
773        {
774            tcp_allowed_connection_info->netmask =
775                (struct in_addr *) calloc(network_count, sizeof(struct in_addr));
776            if (tcp_allowed_connection_info->netmask == NULL)
777            {
778                free(tcp_allowed_connection_info->network);
779                free(tcp_allowed_connection_info);
780                tcp_allowed_connection_info = NULL;
781            }
782            else {
783                tcp_allowed_connection_info->network_count = network_count;
784            }
785        }
786    }
787    return tcp_allowed_connection_info;
788}
789
790static void
791dealloc_trusted_connection_info(void* ptcp_allowed_connection_info)
792{
793    struct tcp_allowed_connection_s *tcp_allowed_connection_info =
794        (struct tcp_allowed_connection_s *) ptcp_allowed_connection_info;
795    if (tcp_allowed_connection_info)
796    {
797        free(tcp_allowed_connection_info->network);
798        tcp_allowed_connection_info->network = NULL;
799        free(tcp_allowed_connection_info->netmask);
800        tcp_allowed_connection_info->netmask = NULL;
801        free(tcp_allowed_connection_info);
802    }
803    return;
804}
805
806#endif
807
808/*
809 * This function will convert a mask_bits value to an in_addr
810 * representation. i.e for example if
811 * mask_bits was 24 then it would be 255.255.255.0
812 * if mask_bits was 22 then it would be 255.255.252.0
813 * etc
814 */
815static void convert_mask(int mask_bits, struct in_addr *mask)
816{
817   uint32_t addr = -1;
818   addr = addr & ~~(-1 << (mask_bits ? (32 - mask_bits) : 32));
819   mask->s_addr = htonl(addr);
820   return;
821}
822
823/* BMI_tcp_set_info()
824 *
825 * Pass in optional parameters.
826 *
827 * returns 0 on success, -errno on failure
828 */
829int BMI_tcp_set_info(int option,
830                     void *inout_parameter)
831{
832    int ret = -1;
833    bmi_method_addr_p tmp_addr = NULL;
834
835    gen_mutex_lock(&interface_mutex);
836
837    switch (option)
838    {
839    case BMI_TCP_BUFFER_SEND_SIZE:
840       tcp_buffer_size_send = *((int *)inout_parameter);
841       ret = 0;
842#ifdef __PVFS2_SERVER__
843       /* Set the default socket buffer sizes for the server socket */
844       bmi_set_sock_buffers(
845           ((struct tcp_addr *)
846            tcp_method_params.listen_addr->method_data)->socket);
847#endif
848       break;
849    case BMI_TCP_BUFFER_RECEIVE_SIZE:
850       tcp_buffer_size_receive = *((int *)inout_parameter);
851       ret = 0;
852#ifdef __PVFS2_SERVER__
853       /* Set the default socket buffer sizes for the server socket */
854       bmi_set_sock_buffers(
855           ((struct tcp_addr *)
856            tcp_method_params.listen_addr->method_data)->socket);
857#endif
858       break;
859    case BMI_TCP_CLOSE_SOCKET:
860        /* this should no longer make it to the bmi_tcp method; see bmi.c */
861        ret = 0;
862        break;
863    case BMI_FORCEFUL_CANCEL_MODE:
864        forceful_cancel_mode = 1;
865        ret = 0;
866        break;
867    case BMI_DROP_ADDR:
868        if (inout_parameter == NULL)
869        {
870            ret = bmi_tcp_errno_to_pvfs(-EINVAL);
871        }
872        else
873        {
874            tmp_addr = (bmi_method_addr_p) inout_parameter;
875            /* take it out of the socket collection */
876            tcp_forget_addr(tmp_addr, 1, 0);
877            ret = 0;
878        }
879        break;
880#ifdef USE_TRUSTED
881    case BMI_TRUSTED_CONNECTION:
882    {
883        struct tcp_allowed_connection_s *tcp_allowed_connection = NULL;
884        if (inout_parameter == NULL)
885        {
886            ret = bmi_tcp_errno_to_pvfs(-EINVAL);
887            break;
888        }
889        else
890        {
891            int    bmi_networks_count = 0;
892            char **bmi_networks = NULL;
893            int   *bmi_netmasks = NULL;
894            struct server_configuration_s *svc_config = NULL;
895
896            svc_config = (struct server_configuration_s *) inout_parameter;
897            tcp_allowed_connection = alloc_trusted_connection_info(svc_config->allowed_networks_count);
898            if (tcp_allowed_connection == NULL)
899            {
900                ret = bmi_tcp_errno_to_pvfs(-ENOMEM);
901                break;
902            }
903#ifdef      __PVFS2_SERVER__
904            gtcp_allowed_connection = tcp_allowed_connection;
905#endif
906            /* Stash this in the server_configuration_s structure. freed later on */
907            svc_config->security = tcp_allowed_connection;
908            svc_config->security_dtor = &dealloc_trusted_connection_info;
909            ret = 0;
910            /* Fill up the list of allowed ports */
911            PINT_config_get_allowed_ports(svc_config,
912                    &tcp_allowed_connection->port_enforce,
913                    tcp_allowed_connection->ports);
914
915            /* if it was enabled, make sure that we know how to deal with it */
916            if (tcp_allowed_connection->port_enforce == 1)
917            {
918                /* illegal ports */
919                if (tcp_allowed_connection->ports[0] > 65535
920                        || tcp_allowed_connection->ports[1] > 65535
921                        || tcp_allowed_connection->ports[1] < tcp_allowed_connection->ports[0])
922                {
923                    gossip_lerr("Error: illegal trusted port values\n");
924                    ret = bmi_tcp_errno_to_pvfs(-EINVAL);
925                    /* don't enforce anything! */
926                    tcp_allowed_connection->port_enforce = 0;
927                }
928            }
929            ret = 0;
930            /* Retrieve the list of BMI network addresses and masks  */
931            PINT_config_get_allowed_networks(svc_config,
932                    &tcp_allowed_connection->network_enforce,
933                    &bmi_networks_count,
934                    &bmi_networks,
935                    &bmi_netmasks);
936
937            /* if it was enabled, make sure that we know how to deal with it */
938            if (tcp_allowed_connection->network_enforce == 1)
939            {
940                int i;
941
942                for (i = 0; i < bmi_networks_count; i++)
943                {
944                    char *tcp_string = NULL;
945                    /* Convert the network string into an in_addr_t structure */
946                    tcp_string = string_key("tcp", bmi_networks[i]);
947                    if (!tcp_string)
948                    {
949                        /* the string doesn't even have our info */
950                        gossip_lerr("Error: malformed tcp network address\n");
951                        ret = bmi_tcp_errno_to_pvfs(-EINVAL);
952                    }
953                    else {
954                        /* convert this into an in_addr_t */
955                        inet_aton(tcp_string, &tcp_allowed_connection->network[i]);
956                        free(tcp_string);
957                    }
958                    convert_mask(bmi_netmasks[i], &tcp_allowed_connection->netmask[i]);
959                }
960                /* don't enforce anything if there were any errors */
961                if (ret != 0)
962                {
963                    tcp_allowed_connection->network_enforce = 0;
964                }
965            }
966        }
967        break;
968    }
969#endif
970    case BMI_TCP_CHECK_UNEXPECTED:
971    {
972        check_unexpected = *(int *)inout_parameter;
973        ret = 0;
974        break;
975    }
976
977    default:
978        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
979                      "TCP hint %d not implemented.\n", option);
980        ret = 0;
981        break;
982    }
983
984    gen_mutex_unlock(&interface_mutex);
985    return (ret);
986}
987
988/* BMI_tcp_get_info()
989 *
990 * Query for optional parameters.
991 *
992 * returns 0 on success, -errno on failure
993 */
994int BMI_tcp_get_info(int option,
995                     void *inout_parameter)
996{
997    struct method_drop_addr_query* query;
998    struct tcp_addr* tcp_addr_data;
999    int ret = 0;
1000
1001    gen_mutex_lock(&interface_mutex);
1002
1003    switch (option)
1004    {
1005    case BMI_CHECK_MAXSIZE:
1006        *((int *) inout_parameter) = TCP_MODE_REND_LIMIT;
1007        ret = 0;
1008        break;
1009    case BMI_DROP_ADDR_QUERY:
1010        query = (struct method_drop_addr_query*)inout_parameter;
1011        tcp_addr_data = (struct tcp_addr *) query->addr->method_data;
1012        /* only suggest that we discard the address if we have experienced
1013         * an error and there is no way to reconnect
1014         */
1015        if(tcp_addr_data->addr_error != 0 &&
1016           tcp_addr_data->dont_reconnect == 1)
1017        {
1018            query->response = 1;
1019        }
1020        else
1021        {
1022            query->response = 0;
1023        }
1024        ret = 0;
1025        break;
1026    case BMI_GET_UNEXP_SIZE:
1027        *((int *) inout_parameter) = TCP_MODE_EAGER_LIMIT;
1028        ret = 0;
1029        break;
1030
1031    default:
1032        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
1033                      "TCP hint %d not implemented.\n", option);
1034        ret = -ENOSYS;
1035        break;
1036    }
1037
1038    gen_mutex_unlock(&interface_mutex);
1039    return (ret < 0) ? bmi_tcp_errno_to_pvfs(ret) : ret;
1040}
1041
1042
1043/* BMI_tcp_post_send()
1044 *
1045 * Submits send operations.
1046 *
1047 * returns 0 on success that requires later poll, returns 1 on instant
1048 * completion, -errno on failure
1049 */
1050int BMI_tcp_post_send(bmi_op_id_t * id,
1051                      bmi_method_addr_p dest,
1052                      const void *buffer,
1053                      bmi_size_t size,
1054                      enum bmi_buffer_type buffer_type,
1055                      bmi_msg_tag_t tag,
1056                      void *user_ptr,
1057                      bmi_context_id context_id,
1058                      PVFS_hint hints)
1059{
1060    struct tcp_msg_header my_header;
1061    int ret = -1;
1062
1063    /* clear the id field for safety */
1064    *id = 0;
1065
1066    /* fill in the TCP-specific message header */
1067    if (size > TCP_MODE_REND_LIMIT)
1068    {
1069        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1070    }
1071
1072    if (size <= TCP_MODE_EAGER_LIMIT)
1073    {
1074        my_header.mode = TCP_MODE_EAGER;
1075    }
1076    else
1077    {
1078        my_header.mode = TCP_MODE_REND;
1079    }
1080    my_header.tag = tag;
1081    my_header.size = size;
1082    my_header.magic_nr = BMI_MAGIC_NR;
1083
1084    gen_mutex_lock(&interface_mutex);
1085
1086    ret = tcp_post_send_generic(id, dest, &buffer,
1087                                &size, 1, buffer_type, my_header,
1088                                user_ptr, context_id, hints);
1089
1090    gen_mutex_unlock(&interface_mutex);
1091    return(ret);
1092}
1093
1094
1095/* BMI_tcp_post_sendunexpected()
1096 *
1097 * Submits unexpected send operations.
1098 *
1099 * returns 0 on success that requires later poll, returns 1 on instant
1100 * completion, -errno on failure
1101 */
1102int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
1103                                bmi_method_addr_p dest,
1104                                const void *buffer,
1105                                bmi_size_t size,
1106                                enum bmi_buffer_type buffer_type,
1107                                bmi_msg_tag_t tag,
1108                                void *user_ptr,
1109                                bmi_context_id context_id,
1110                                PVFS_hint hints)
1111{
1112    struct tcp_msg_header my_header;
1113    int ret = -1;
1114
1115    /* clear the id field for safety */
1116    *id = 0;
1117
1118    if (size > TCP_MODE_EAGER_LIMIT)
1119    {
1120        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1121    }
1122
1123    my_header.mode = TCP_MODE_UNEXP;
1124    my_header.tag = tag;
1125    my_header.size = size;
1126    my_header.magic_nr = BMI_MAGIC_NR;
1127
1128    gen_mutex_lock(&interface_mutex);
1129
1130    ret = tcp_post_send_generic(id, dest, &buffer,
1131                                &size, 1, buffer_type, my_header,
1132                                user_ptr, context_id, hints);
1133    gen_mutex_unlock(&interface_mutex);
1134    return(ret);
1135}
1136
1137
1138
1139/* BMI_tcp_post_recv()
1140 *
1141 * Submits recv operations.
1142 *
1143 * returns 0 on success that requires later poll, returns 1 on instant
1144 * completion, -errno on failure
1145 */
1146int BMI_tcp_post_recv(bmi_op_id_t * id,
1147                      bmi_method_addr_p src,
1148                      void *buffer,
1149                      bmi_size_t expected_size,
1150                      bmi_size_t * actual_size,
1151                      enum bmi_buffer_type buffer_type,
1152                      bmi_msg_tag_t tag,
1153                      void *user_ptr,
1154                      bmi_context_id context_id,
1155                      PVFS_hint hints)
1156{
1157    int ret = -1;
1158
1159    /* A few things could happen here:
1160     * a) rendez. recv with sender not ready yet
1161     * b) rendez. recv with sender waiting
1162     * c) eager recv, data not available yet
1163     * d) eager recv, some/all data already here
1164     * e) rendez. recv with sender in eager mode
1165     *
1166     * b or d could lead to completion without polling.
1167     * we don't look for unexpected messages here.
1168     */
1169
1170    if (expected_size > TCP_MODE_REND_LIMIT)
1171    {
1172        return (bmi_tcp_errno_to_pvfs(-EINVAL));
1173    }
1174    gen_mutex_lock(&interface_mutex);
1175
1176    ret = tcp_post_recv_generic(id, src, &buffer, &expected_size,
1177                                1, expected_size, actual_size,
1178                                buffer_type, tag,
1179                                user_ptr, context_id, hints);
1180
1181    gen_mutex_unlock(&interface_mutex);
1182    return (ret);
1183}
1184
1185
1186/* BMI_tcp_test()
1187 *
1188 * Checks to see if a particular message has completed.
1189 *
1190 * returns 0 on success, -errno on failure
1191 */
1192int BMI_tcp_test(bmi_op_id_t id,
1193                 int *outcount,
1194                 bmi_error_code_t * error_code,
1195                 bmi_size_t * actual_size,
1196                 void **user_ptr,
1197                 int max_idle_time,
1198                 bmi_context_id context_id)
1199{
1200    int ret = -1;
1201    method_op_p query_op = (method_op_p)id_gen_fast_lookup(id);
1202
1203    assert(query_op != NULL);
1204
1205    gen_mutex_lock(&interface_mutex);
1206
1207    /* do some ``real work'' here */
1208    ret = tcp_do_work(max_idle_time);
1209    if (ret < 0)
1210    {
1211        gen_mutex_unlock(&interface_mutex);
1212        return (ret);
1213    }
1214
1215    if (((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1216        BMI_TCP_COMPLETE)
1217    {
1218        assert(query_op->context_id == context_id);
1219        op_list_remove(query_op);
1220        if (user_ptr != NULL)
1221        {
1222            (*user_ptr) = query_op->user_ptr;
1223        }
1224        (*error_code) = query_op->error_code;
1225        (*actual_size) = query_op->actual_size;
1226        PINT_EVENT_END(
1227            (query_op->send_recv == BMI_SEND ?
1228             bmi_tcp_send_event_id : bmi_tcp_recv_event_id), bmi_tcp_pid, NULL,
1229             query_op->event_id, id, *actual_size);
1230
1231        dealloc_tcp_method_op(query_op);
1232        (*outcount)++;
1233    }
1234
1235    gen_mutex_unlock(&interface_mutex);
1236    return (0);
1237}
1238
1239/* BMI_tcp_testsome()
1240 *
1241 * Checks to see if any messages from the specified list have completed.
1242 *
1243 * returns 0 on success, -errno on failure
1244 */
1245int BMI_tcp_testsome(int incount,
1246                     bmi_op_id_t * id_array,
1247                     int *outcount,
1248                     int *index_array,
1249                     bmi_error_code_t * error_code_array,
1250                     bmi_size_t * actual_size_array,
1251                     void **user_ptr_array,
1252                     int max_idle_time,
1253                     bmi_context_id context_id)
1254{
1255    int ret = -1;
1256    method_op_p query_op = NULL;
1257    int i;
1258
1259    gen_mutex_lock(&interface_mutex);
1260
1261    /* do some ``real work'' here */
1262    ret = tcp_do_work(max_idle_time);
1263    if (ret < 0)
1264    {
1265        gen_mutex_unlock(&interface_mutex);
1266        return (ret);
1267    }
1268
1269    for(i=0; i<incount; i++)
1270    {
1271        if(id_array[i])
1272        {
1273            /* NOTE: this depends on the user passing in valid id's;
1274             * otherwise we segfault. 
1275             */
1276            query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
1277            if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1278               BMI_TCP_COMPLETE)
1279            {
1280                assert(query_op->context_id == context_id);
1281                /* this one's done; pop it out */
1282                op_list_remove(query_op);
1283                error_code_array[*outcount] = query_op->error_code;
1284                actual_size_array[*outcount] = query_op->actual_size;
1285                index_array[*outcount] = i;
1286                if (user_ptr_array != NULL)
1287                {
1288                    user_ptr_array[*outcount] = query_op->user_ptr;
1289                }
1290                PINT_EVENT_END(
1291                    (query_op->send_recv == BMI_SEND ?
1292                     bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
1293                    bmi_tcp_pid, NULL,
1294                    query_op->event_id, actual_size_array[*outcount]);
1295                dealloc_tcp_method_op(query_op);
1296                (*outcount)++;
1297            }
1298        }
1299    }
1300
1301    gen_mutex_unlock(&interface_mutex);
1302    return(0);
1303}
1304
1305
1306/* BMI_tcp_testunexpected()
1307 *
1308 * Checks to see if any unexpected messages have completed.
1309 *
1310 * returns 0 on success, -errno on failure
1311 */
1312int BMI_tcp_testunexpected(int incount,
1313                           int *outcount,
1314                           struct bmi_method_unexpected_info *info,
1315                           int max_idle_time)
1316{
1317    int ret = -1;
1318    method_op_p query_op = NULL;
1319
1320    gen_mutex_lock(&interface_mutex);
1321
1322    if(op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
1323    {
1324        /* do some ``real work'' here */
1325        ret = tcp_do_work(max_idle_time);
1326        if (ret < 0)
1327        {
1328            gen_mutex_unlock(&interface_mutex);
1329            return (ret);
1330        }
1331    }
1332
1333    *outcount = 0;
1334
1335    /* go through the completed/unexpected list as long as we are finding
1336     * stuff and we have room in the info array for it
1337     */
1338    while ((*outcount < incount) &&
1339           (query_op =
1340            op_list_shownext(op_list_array[IND_COMPLETE_RECV_UNEXP])))
1341    {
1342        info[*outcount].error_code = query_op->error_code;
1343        info[*outcount].addr = query_op->addr;
1344        info[*outcount].buffer = query_op->buffer;
1345        info[*outcount].size = query_op->actual_size;
1346        info[*outcount].tag = query_op->msg_tag;
1347        op_list_remove(query_op);
1348        dealloc_tcp_method_op(query_op);
1349        (*outcount)++;
1350    }
1351    gen_mutex_unlock(&interface_mutex);
1352    return (0);
1353}
1354
1355
1356/* BMI_tcp_testcontext()
1357 *
1358 * Checks to see if any messages from the specified context have completed.
1359 *
1360 * returns 0 on success, -errno on failure
1361 */
1362int BMI_tcp_testcontext(int incount,
1363                     bmi_op_id_t* out_id_array,
1364                     int *outcount,
1365                     bmi_error_code_t * error_code_array,
1366                     bmi_size_t * actual_size_array,
1367                     void **user_ptr_array,
1368                     int max_idle_time,
1369                     bmi_context_id context_id)
1370{
1371    int ret = -1;
1372    method_op_p query_op = NULL;
1373
1374    *outcount = 0;
1375
1376    gen_mutex_lock(&interface_mutex);
1377
1378    if(op_list_empty(completion_array[context_id]))
1379    {
1380        /* if there are unexpected ops ready to go, then short out so
1381         * that the next testunexpected call can pick it up without
1382         * delay
1383         */
1384        if(check_unexpected &&
1385           !op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
1386        {
1387            gen_mutex_unlock(&interface_mutex);
1388            return(0);
1389        }
1390
1391        /* do some ``real work'' here */
1392        ret = tcp_do_work(max_idle_time);
1393        if (ret < 0)
1394        {
1395            gen_mutex_unlock(&interface_mutex);
1396            return (ret);
1397        }
1398    }
1399
1400    /* pop as many items off of the completion queue as we can */
1401    while((*outcount < incount) &&
1402          (query_op =
1403           op_list_shownext(completion_array[context_id])))
1404    {
1405        assert(query_op);
1406        assert(query_op->context_id == context_id);
1407
1408        /* this one's done; pop it out */
1409        op_list_remove(query_op);
1410        error_code_array[*outcount] = query_op->error_code;
1411        actual_size_array[*outcount] = query_op->actual_size;
1412        out_id_array[*outcount] = query_op->op_id;
1413        if (user_ptr_array != NULL)
1414        {
1415            user_ptr_array[*outcount] = query_op->user_ptr;
1416        }
1417
1418        PINT_EVENT_END((query_op->send_recv == BMI_SEND ?
1419                        bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
1420                       bmi_tcp_pid, NULL, query_op->event_id,
1421                       query_op->actual_size);
1422
1423        dealloc_tcp_method_op(query_op);
1424        query_op = NULL;
1425        (*outcount)++;
1426    }
1427
1428    gen_mutex_unlock(&interface_mutex);
1429    return(0);
1430}
1431
1432
1433
1434/* BMI_tcp_post_send_list()
1435 *
1436 * same as the BMI_tcp_post_send() function, except that it sends
1437 * from an array of possibly non contiguous buffers
1438 *
1439 * returns 0 on success, 1 on immediate successful completion,
1440 * -errno on failure
1441 */
1442int BMI_tcp_post_send_list(bmi_op_id_t * id,
1443                           bmi_method_addr_p dest,
1444                           const void *const *buffer_list,
1445                           const bmi_size_t *size_list,
1446                           int list_count,
1447                           bmi_size_t total_size,
1448                           enum bmi_buffer_type buffer_type,
1449                           bmi_msg_tag_t tag,
1450                           void *user_ptr,
1451                           bmi_context_id context_id,
1452                           PVFS_hint hints)
1453{
1454    struct tcp_msg_header my_header;
1455    int ret = -1;
1456
1457    /* clear the id field for safety */
1458    *id = 0;
1459
1460    /* fill in the TCP-specific message header */
1461    if (total_size > TCP_MODE_REND_LIMIT)
1462    {
1463        gossip_lerr("Error: BMI message too large!\n");
1464        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1465    }
1466
1467    if (total_size <= TCP_MODE_EAGER_LIMIT)
1468    {
1469        my_header.mode = TCP_MODE_EAGER;
1470    }
1471    else
1472    {
1473        my_header.mode = TCP_MODE_REND;
1474    }
1475    my_header.tag = tag;
1476    my_header.size = total_size;
1477    my_header.magic_nr = BMI_MAGIC_NR;
1478
1479    gen_mutex_lock(&interface_mutex);
1480
1481    ret = tcp_post_send_generic(id, dest, buffer_list,
1482                                size_list, list_count, buffer_type,
1483                                my_header, user_ptr, context_id, hints);
1484    gen_mutex_unlock(&interface_mutex);
1485    return(ret);
1486}
1487
1488/* BMI_tcp_post_recv_list()
1489 *
1490 * same as the BMI_tcp_post_recv() function, except that it recvs
1491 * into an array of possibly non contiguous buffers
1492 *
1493 * returns 0 on success, 1 on immediate successful completion,
1494 * -errno on failure
1495 */
1496int BMI_tcp_post_recv_list(bmi_op_id_t * id,
1497                           bmi_method_addr_p src,
1498                           void *const *buffer_list,
1499                           const bmi_size_t *size_list,
1500                           int list_count,
1501                           bmi_size_t total_expected_size,
1502                           bmi_size_t * total_actual_size,
1503                           enum bmi_buffer_type buffer_type,
1504                           bmi_msg_tag_t tag,
1505                           void *user_ptr,
1506                           bmi_context_id context_id,
1507                           PVFS_hint hints)
1508{
1509    int ret = -1;
1510
1511    if (total_expected_size > TCP_MODE_REND_LIMIT)
1512    {
1513        return (bmi_tcp_errno_to_pvfs(-EINVAL));
1514    }
1515
1516    gen_mutex_lock(&interface_mutex);
1517
1518    ret = tcp_post_recv_generic(id, src, buffer_list, size_list,
1519                                list_count, total_expected_size,
1520                                total_actual_size, buffer_type, tag, user_ptr,
1521                                context_id, hints);
1522
1523    gen_mutex_unlock(&interface_mutex);
1524    return (ret);
1525}
1526
1527
1528/* BMI_tcp_post_sendunexpected_list()
1529 *
1530 * same as the BMI_tcp_post_sendunexpected() function, except that
1531 * it sends from an array of possibly non contiguous buffers
1532 *
1533 * returns 0 on success, 1 on immediate successful completion,
1534 * -errno on failure
1535 */
1536int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
1537                                     bmi_method_addr_p dest,
1538                                     const void *const *buffer_list,
1539                                     const bmi_size_t *size_list,
1540                                     int list_count,
1541                                     bmi_size_t total_size,
1542                                     enum bmi_buffer_type buffer_type,
1543                                     bmi_msg_tag_t tag,
1544                                     void *user_ptr,
1545                                     bmi_context_id context_id,
1546                                     PVFS_hint hints)
1547{
1548    struct tcp_msg_header my_header;
1549    int ret = -1;
1550
1551    /* clear the id field for safety */
1552    *id = 0;
1553
1554    if (total_size > TCP_MODE_EAGER_LIMIT)
1555    {
1556        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1557    }
1558
1559    my_header.mode = TCP_MODE_UNEXP;
1560    my_header.tag = tag;
1561    my_header.size = total_size;
1562    my_header.magic_nr = BMI_MAGIC_NR;
1563
1564    gen_mutex_lock(&interface_mutex);
1565
1566    ret = tcp_post_send_generic(id, dest, buffer_list,
1567                                size_list, list_count, buffer_type,
1568                                my_header, user_ptr, context_id, hints);
1569
1570    gen_mutex_unlock(&interface_mutex);
1571    return(ret);
1572}
1573
1574
1575/* BMI_tcp_open_context()
1576 *
1577 * opens a new context with the specified context id
1578 *
1579 * returns 0 on success, -errno on failure
1580 */
1581int BMI_tcp_open_context(bmi_context_id context_id)
1582{
1583
1584    gen_mutex_lock(&interface_mutex);
1585
1586    /* start a new queue for tracking completions in this context */
1587    completion_array[context_id] = op_list_new();
1588    if (!completion_array[context_id])
1589    {
1590        gen_mutex_unlock(&interface_mutex);
1591        return(bmi_tcp_errno_to_pvfs(-ENOMEM));
1592    }
1593
1594    gen_mutex_unlock(&interface_mutex);
1595    return(0);
1596}
1597
1598
1599/* BMI_tcp_close_context()
1600 *
1601 * shuts down a context, previously opened with BMI_tcp_open_context()
1602 *
1603 * no return value
1604 */
1605void BMI_tcp_close_context(bmi_context_id context_id)
1606{
1607   
1608    gen_mutex_lock(&interface_mutex);
1609
1610    /* tear down completion queue for this context */
1611    op_list_cleanup(completion_array[context_id]);
1612
1613    gen_mutex_unlock(&interface_mutex);
1614    return;
1615}
1616
1617
1618/* BMI_tcp_cancel()
1619 *
1620 * attempt to cancel a pending bmi tcp operation
1621 *
1622 * returns 0 on success, -errno on failure
1623 */
1624int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id)
1625{
1626    method_op_p query_op = NULL;
1627   
1628    gen_mutex_lock(&interface_mutex);
1629
1630    query_op = (method_op_p)id_gen_fast_lookup(id);
1631    if(!query_op)
1632    {
1633        /* if we can't find the operattion, then assume that it has already
1634         * completed naturally
1635         */
1636        gen_mutex_unlock(&interface_mutex);
1637        return(0);
1638    }
1639
1640    /* easy case: is the operation already completed? */
1641    if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1642        BMI_TCP_COMPLETE)
1643    {
1644        /* only close socket in forceful cancel mode */
1645        if(forceful_cancel_mode)
1646            tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1647        /* we are done! status will be collected during test */
1648        gen_mutex_unlock(&interface_mutex);
1649        return(0);
1650    }
1651
1652    /* has the operation started moving data yet? */
1653    if(query_op->env_amt_complete)
1654    {
1655        /* be pessimistic and kill the socket, even if not in forceful
1656         * cancel mode */
1657        /* NOTE: this may place other operations beside this one into
1658         * EINTR error state
1659         */
1660        tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1661        gen_mutex_unlock(&interface_mutex);
1662        return(0);
1663    }
1664
1665    /* if we fall to this point, op has been posted, but no data has moved
1666     * for it yet as far as we know
1667     */
1668
1669    /* mark op as canceled, move to completion queue */
1670    query_op->error_code = -BMI_ECANCEL;
1671    if(query_op->send_recv == BMI_SEND)
1672    {
1673        BMI_socket_collection_remove_write_bit(tcp_socket_collection_p,
1674                                           query_op->addr);
1675    }
1676    op_list_remove(query_op);
1677    ((struct tcp_op*)(query_op->method_data))->tcp_op_state =
1678        BMI_TCP_COMPLETE;
1679    /* only close socket in forceful cancel mode */
1680    if(forceful_cancel_mode)
1681        tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1682    op_list_add(completion_array[query_op->context_id], query_op);
1683    gen_mutex_unlock(&interface_mutex);
1684    return(0);
1685}
1686
1687/*
1688 * For now, we only support wildcard strings that are IP addresses
1689 * and not *hostnames*!
1690 */
1691static int check_valid_wildcard(const char *wildcard_string, unsigned long *octets)
1692{
1693    int i, len = strlen(wildcard_string), last_dot = -1, octet_count = 0;
1694    char str[16];
1695    for (i = 0; i < len; i++)
1696    {
1697        char c = wildcard_string[i];
1698        memset(str, 0, 16);
1699        if ((c < '0' || c > '9') && c != '*' && c != '.')
1700            return -EINVAL;
1701        if (c == '*') {
1702            if (octet_count >= 4)
1703                return -EINVAL;
1704            octets[octet_count++] = 256;
1705        }
1706        else if (c == '.')
1707        {
1708            char *endptr = NULL;
1709            if (octet_count >= 4)
1710                return -EINVAL;
1711            strncpy(str, &wildcard_string[last_dot + 1], (i - last_dot - 1));
1712            octets[octet_count++] = strtol(str, &endptr, 10);
1713            if (*endptr != '\0' || octets[octet_count-1] >= 256)
1714                return -EINVAL;
1715            last_dot = i;
1716        }
1717    }
1718    for (i = octet_count; i < 4; i++)
1719    {
1720         octets[i] = 256;
1721    }
1722    return 0;
1723}
1724
1725/*
1726 * return 1 if the addr specified is part of the wildcard specification of octet
1727 * return 0 otherwise.
1728 */
1729static int check_octets(struct in_addr addr, unsigned long *octets)
1730{
1731#define B1_MASK  0xff000000
1732#define B1_SHIFT 24
1733#define B2_MASK  0x00ff0000
1734#define B2_SHIFT 16
1735#define B3_MASK  0x0000ff00
1736#define B3_SHIFT 8
1737#define B4_MASK  0x000000ff
1738    uint32_t host_addr = ntohl(addr.s_addr);
1739    /* * stands for all clients */
1740    if (octets[0] == 256)
1741    {
1742        return 1;
1743    }
1744    if (((host_addr & B1_MASK) >> B1_SHIFT) != octets[0])
1745    {
1746        return 0;
1747    }
1748    if (octets[1] == 256)
1749    {
1750        return 1;
1751    }
1752    if (((host_addr & B2_MASK) >> B2_SHIFT) != octets[1])
1753    {
1754        return 0;
1755    }
1756    if (octets[2] == 256)
1757    {
1758        return 1;
1759    }
1760    if (((host_addr & B3_MASK) >> B3_SHIFT) != octets[2])
1761    {
1762        return 0;
1763    }
1764    if (octets[3] == 256)
1765    {
1766        return 1;
1767    }
1768    if ((host_addr & B4_MASK) != octets[3])
1769    {
1770        return 0;
1771    }
1772    return 1;
1773#undef B1_MASK
1774#undef B1_SHIFT
1775#undef B2_MASK
1776#undef B2_SHIFT
1777#undef B3_MASK
1778#undef B3_SHIFT
1779#undef B4_MASK
1780}
1781/* BMI_tcp_query_addr_range()
1782 * Check if a given address is within the network specified by the wildcard string!
1783 * or if it is part of the subnet mask specified
1784 */
1785int BMI_tcp_query_addr_range(bmi_method_addr_p map, const char *wildcard_string, int netmask)
1786{
1787    struct tcp_addr *tcp_addr_data = (struct tcp_addr *) map->method_data;
1788    struct sockaddr_in map_addr;
1789    socklen_t map_addr_len = sizeof(map_addr);
1790    const char *tcp_wildcard = wildcard_string + 6 /* strlen("tcp://") */;
1791    int ret = -1;
1792
1793    memset(&map_addr, 0, sizeof(map_addr));
1794    if(getpeername(tcp_addr_data->socket, (struct sockaddr *) &map_addr, (int *) &map_addr_len) < 0)
1795    {
1796        ret =  bmi_tcp_errno_to_pvfs(-EINVAL);
1797        gossip_err("Error: failed to retrieve peer name for client.\n");
1798        return(ret);
1799    }
1800    /* Wildcard specification */
1801    if (netmask == -1)
1802    {
1803        unsigned long octets[4];
1804        if (check_valid_wildcard(tcp_wildcard, octets) < 0)
1805        {
1806            gossip_lerr("Invalid wildcard specification: %s\n", tcp_wildcard);
1807            return -EINVAL;
1808        }
1809        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Map Address is : %s, Wildcard Octets: %lu.%lu.%lu.%lu\n", inet_ntoa(map_addr.sin_addr),
1810                octets[0], octets[1], octets[2], octets[3]);
1811        if (check_octets(map_addr.sin_addr, octets) == 1)
1812        {
1813            return 1;
1814        }
1815    }
1816    /* Netmask specification */
1817    else {
1818        struct sockaddr_in mask_addr, network_addr;
1819        memset(&mask_addr, 0, sizeof(mask_addr));
1820        memset(&network_addr, 0, sizeof(network_addr));
1821        /* Convert the netmask address */
1822        convert_mask(netmask, &mask_addr.sin_addr);
1823        /* Invalid network address */
1824        if (inet_aton(tcp_wildcard, &network_addr.sin_addr) == 0)
1825        {
1826            gossip_err("Invalid network specification: %s\n", tcp_wildcard);
1827            return -EINVAL;
1828        }
1829        /* Matches the subnet mask! */
1830        if ((map_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr)
1831                == (network_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr))
1832        {
1833            return 1;
1834        }
1835    }
1836    return 0;
1837}
1838
1839/* BMI_tcp_addr_rev_lookup_unexpected()
1840 *
1841 * looks up an address that was initialized unexpectedly and returns a string
1842 * hostname
1843 *
1844 * returns string on success, "UNKNOWN" on failure
1845 */
1846const char* BMI_tcp_addr_rev_lookup_unexpected(bmi_method_addr_p map)
1847{
1848    struct tcp_addr *tcp_addr_data = (struct tcp_addr *) map->method_data;
1849    int debug_on;
1850    uint64_t mask;
1851    socklen_t peerlen;
1852    struct sockaddr_in peer;
1853    int ret;
1854    struct hostent *peerent;
1855    char* tmp_peer;
1856
1857    /* return default response if we don't have support for the right socket
1858     * calls
1859     */
1860#if !defined(HAVE_GETHOSTBYADDR)
1861    return(tcp_addr_data->peer);
1862#else
1863
1864    /* Only resolve hostnames if a gossip mask is set to request it.
1865     * Otherwise we leave it at ip address
1866     */
1867    gossip_get_debug_mask(&debug_on, &mask);
1868
1869    if(!debug_on || (!(mask & GOSSIP_ACCESS_HOSTNAMES)))
1870    {
1871        return(tcp_addr_data->peer);
1872    }
1873
1874    peerlen = sizeof(struct sockaddr_in);
1875
1876    if(tcp_addr_data->peer_type == BMI_TCP_PEER_HOSTNAME)
1877    {
1878        /* full hostname already cached; return now */
1879        return(tcp_addr_data->peer);
1880    }
1881
1882    /* if we hit this point, we need to resolve hostname */
1883    ret = getpeername(tcp_addr_data->socket, (struct sockaddr*) &(peer), (int *) &peerlen);
1884    if(ret < 0)
1885    {
1886        /* default to use IP address */
1887        return(tcp_addr_data->peer);
1888    }
1889
1890    peerent = gethostbyaddr((const char *) &peer.sin_addr.s_addr,
1891        sizeof(struct in_addr), AF_INET);
1892    if(peerent == NULL)
1893    {
1894        /* default to use IP address */
1895        return(tcp_addr_data->peer);
1896    }
1897 
1898    tmp_peer = (char*)malloc(strlen(peerent->h_name) + 1);
1899    if(!tmp_peer)
1900    {
1901        /* default to use IP address */
1902        return(tcp_addr_data->peer);
1903    }
1904    strcpy(tmp_peer, peerent->h_name);
1905    if(tcp_addr_data->peer)
1906    {
1907        free(tcp_addr_data->peer);
1908    }
1909    tcp_addr_data->peer = tmp_peer;
1910    tcp_addr_data->peer_type = BMI_TCP_PEER_HOSTNAME;
1911    return(tcp_addr_data->peer);
1912
1913#endif
1914
1915}
1916
1917/* tcp_forget_addr()
1918 *
1919 * completely removes a tcp method address from use, and aborts any
1920 * operations that use the address.  If the
1921 * dealloc_flag is set, the memory used by the address will be
1922 * deallocated as well.
1923 *
1924 * no return value
1925 */
1926void tcp_forget_addr(bmi_method_addr_p map,
1927                     int dealloc_flag,
1928                     int error_code)
1929{
1930    struct tcp_addr* tcp_addr_data = (struct tcp_addr *) map->method_data;
1931    BMI_addr_t bmi_addr = tcp_addr_data->bmi_addr;
1932    int tmp_outcount;
1933    bmi_method_addr_p tmp_addr;
1934    int tmp_status;
1935
1936    if (tcp_socket_collection_p)
1937    {
1938        BMI_socket_collection_remove(tcp_socket_collection_p, map);
1939        /* perform a test to force the socket collection to act on the remove
1940         * request before continuing
1941         */
1942        if(!sc_test_busy)
1943        {
1944            BMI_socket_collection_testglobal(tcp_socket_collection_p,
1945                0, &tmp_outcount, &tmp_addr, &tmp_status, 0);
1946        }
1947    }
1948
1949    tcp_shutdown_addr(map);
1950    tcp_cleanse_addr(map, error_code);
1951    tcp_addr_data->addr_error = error_code;
1952    if (dealloc_flag)
1953    {
1954        dealloc_tcp_method_addr(map);
1955    }
1956    else
1957    {
1958        /* this will cause the bmi control layer to check to see if
1959         * this address can be completely forgotten
1960         */
1961        bmi_method_addr_forget_callback(bmi_addr);
1962    }
1963    return;
1964};
1965
1966/******************************************************************
1967 * Internal support functions
1968 */
1969
1970
1971/*
1972 * dealloc_tcp_method_addr()
1973 *
1974 * destroys method address structures generated by the TCP/IP module.
1975 *
1976 * no return value
1977 */
1978static void dealloc_tcp_method_addr(bmi_method_addr_p map)
1979{
1980
1981    struct tcp_addr *tcp_addr_data = NULL;
1982
1983    tcp_addr_data = (struct tcp_addr *) map->method_data;
1984    /* close the socket, as long as it is not the one we are listening on
1985     * as a server.
1986     */
1987    if (!tcp_addr_data->server_port)
1988    {
1989        if (tcp_addr_data->socket > -1)
1990        {
1991            closesocket(tcp_addr_data->socket);
1992        }
1993    }
1994
1995    if (tcp_addr_data->hostname)
1996        free(tcp_addr_data->hostname);
1997    if (tcp_addr_data->peer)
1998        free(tcp_addr_data->peer);
1999
2000    bmi_dealloc_method_addr(map);
2001
2002    return;
2003}
2004
2005
2006/*
2007 * alloc_tcp_method_addr()
2008 *
2009 * creates a new method address with defaults filled in for TCP/IP.
2010 *
2011 * returns pointer to struct on success, NULL on failure
2012 */
2013bmi_method_addr_p alloc_tcp_method_addr(void)
2014{
2015
2016    struct bmi_method_addr *my_method_addr = NULL;
2017    struct tcp_addr *tcp_addr_data = NULL;
2018
2019    my_method_addr =
2020        bmi_alloc_method_addr(tcp_method_params.method_id, sizeof(struct tcp_addr));
2021    if (!my_method_addr)
2022    {
2023        return (NULL);
2024    }
2025
2026    /* note that we trust the alloc_method_addr() function to have zeroed
2027     * out the structures for us already
2028     */
2029
2030    tcp_addr_data = (struct tcp_addr *) my_method_addr->method_data;
2031    tcp_addr_data->socket = -1;
2032    tcp_addr_data->port = -1;
2033    tcp_addr_data->map = my_method_addr;
2034    tcp_addr_data->sc_index = -1;
2035
2036    return (my_method_addr);
2037}
2038
2039
2040/*
2041 * tcp_server_init()
2042 *
2043 * this function is used to prepare a node to recieve incoming
2044 * connections if it is initialized in a server configuration.   
2045 *
2046 * returns 0 on succes, -errno on failure
2047 */
2048static int tcp_server_init(void)
2049{
2050
2051    int oldfl = 0;                /* old socket flags */
2052    struct tcp_addr *tcp_addr_data = NULL;
2053    int tmp_errno = bmi_tcp_errno_to_pvfs(-EINVAL);
2054    int ret = 0;
2055
2056    /* create a socket */
2057    tcp_addr_data = (struct tcp_addr *) tcp_method_params.listen_addr->method_data;
2058    if ((tcp_addr_data->socket = BMI_sockio_new_sock()) < 0)
2059    {
2060        tmp_errno = WSAGetLastError();
2061        gossip_err("Error: BMI_sockio_new_sock: %d\n", tmp_errno);
2062        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2063    }
2064
2065    /* set it to non-blocking operation */
2066    /*
2067    oldfl = fcntl(tcp_addr_data->socket, F_GETFL, 0);
2068    if (!(oldfl & O_NONBLOCK))
2069    {
2070        fcntl(tcp_addr_data->socket, F_SETFL, oldfl | O_NONBLOCK);
2071    }
2072    */
2073    SET_NONBLOCK(tcp_addr_data->socket);
2074
2075    /* setup for a fast restart to avoid bind addr in use errors */
2076    BMI_sockio_set_sockopt(tcp_addr_data->socket, SO_REUSEADDR, 1);
2077
2078    /* bind it to the appropriate port */
2079    if(tcp_method_params.method_flags & BMI_TCP_BIND_SPECIFIC)
2080    {
2081        ret = BMI_sockio_bind_sock_specific(tcp_addr_data->socket,
2082            tcp_addr_data->hostname,
2083            tcp_addr_data->port);
2084        /* NOTE: this particular function converts errno in advance */
2085        if(ret < 0)
2086        {
2087            PVFS_perror_gossip("BMI_sockio_bind_sock_specific", ret);
2088            return(ret);
2089        }
2090    }
2091    else
2092    {
2093        ret = BMI_sockio_bind_sock(tcp_addr_data->socket,
2094            tcp_addr_data->port);
2095    }
2096   
2097    if (ret < 0)
2098    {
2099        tmp_errno = WSAGetLastError();
2100        gossip_err("Error: BMI_sockio_bind_sock: %d\n", tmp_errno);
2101        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2102    }
2103
2104    /* go ahead and listen to the socket */
2105    if (listen(tcp_addr_data->socket, TCP_BACKLOG) != 0)
2106    {
2107        tmp_errno = WSAGetLastError();
2108        gossip_err("Error: listen: %s\n", tmp_errno);
2109        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2110    }
2111
2112    return (0);
2113}
2114
2115
2116/* find_recv_inflight()
2117 *
2118 * checks to see if there is a recv operation in flight (when in flight
2119 * means that some of the data or envelope has been read) for a
2120 * particular address.
2121 *
2122 * returns pointer to operation on success, NULL if nothing found.
2123 */
2124static method_op_p find_recv_inflight(bmi_method_addr_p map)
2125{
2126    struct op_list_search_key key;
2127    method_op_p query_op = NULL;
2128
2129    memset(&key, 0, sizeof(struct op_list_search_key));
2130    key.method_addr = map;
2131    key.method_addr_yes = 1;
2132
2133    query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
2134
2135    return (query_op);
2136}
2137
2138
2139/* tcp_sock_init()
2140 *
2141 * this is an internal function which is used to build up a TCP/IP
2142 * connection in the situation of a client side operation.
2143 * addressing information to determine which fields need to be set.
2144 * If the connection is already established then it does no work.
2145 *
2146 * NOTE: this is safe to call repeatedly.  However, always check the
2147 * value of the not_connected field in the tcp address before using the
2148 * address.
2149 *
2150 * returns 0 on success, -errno on failure
2151 */
2152static int tcp_sock_init(bmi_method_addr_p my_method_addr)
2153{
2154
2155    int oldfl = 0;                /* socket flags */
2156    int ret = -1;
2157    struct pollfd poll_conn;
2158    struct tcp_addr *tcp_addr_data = (struct tcp_addr *) my_method_addr->method_data;
2159    int tmp_errno = 0;
2160
2161    /* check for obvious problems */
2162    assert(my_method_addr);
2163    assert(my_method_addr->method_type == tcp_method_params.method_id);
2164    assert(tcp_addr_data->server_port == 0);
2165
2166    /* fail immediately if the address is in failure mode and we have no way
2167     * to reconnect
2168     */
2169    if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
2170    {
2171        gossip_debug(GOSSIP_BMI_DEBUG_TCP,
2172        "Warning: BMI communication attempted on an address in failure mode.\n");
2173        return(tcp_addr_data->addr_error);
2174    }
2175
2176    if(tcp_addr_data->addr_error)
2177    {
2178        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "%s: attempting reconnect.\n",
2179          __func__);
2180        tcp_addr_data->addr_error = 0;
2181        assert(tcp_addr_data->socket < 0);
2182        tcp_addr_data->not_connected = 1;
2183    }
2184
2185    /* is there already a socket? */
2186    if (tcp_addr_data->socket > -1)
2187    {
2188        /* check to see if we still need to work on the connect.. */
2189        if (tcp_addr_data->not_connected)
2190        {
2191            /* this is a little weird, but we complete the nonblocking
2192             * connection by polling */
2193            /*
2194            poll_conn.fd = tcp_addr_data->socket;
2195            poll_conn.events = POLLOUT;
2196            ret = poll(&poll_conn, 1, 2);
2197            if ((ret < 0) || (poll_conn.revents & POLLERR))
2198            {
2199                tmp_errno = errno;
2200                gossip_lerr("Error: poll: %s\n", strerror(tmp_errno));
2201                return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2202            }
2203            if (poll_conn.revents & POLLOUT)
2204            {
2205                tcp_addr_data->not_connected = 0;
2206            }
2207            */
2208            /* use select on Windows */
2209           fd_set writefds;
2210           struct timeval timeout;
2211
2212           timeout.tv_sec = 0;
2213           timeout.tv_usec = 2000;   /* 2ms */
2214
2215           FD_ZERO(&writefds);
2216           FD_SET(tcp_addr_data->socket, &writefds);
2217           ret = select(1, NULL, &writefds, NULL, (const struct timeval *) &timeout);
2218           if (ret == SOCKET_ERROR)
2219           {
2220               tmp_errno = WSAGetLastError();
2221               gossip_lerr("Error: select (tcp_sock_init): %d\n", tmp_errno);
2222               return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2223           }
2224           if (FD_ISSET(tcp_addr_data->socket, &writefds))
2225           {
2226               tcp_addr_data->not_connected = 0;
2227           }
2228        }
2229        /* return.  the caller should check the "not_connected" flag to
2230         * see if the socket is usable yet. */
2231        return (0);
2232    }
2233   
2234    bmi_set_sock_buffers(tcp_addr_data->socket);
2235
2236    /* at this point there is no socket.  try to build it */
2237    if (tcp_addr_data->port < 1)
2238    {
2239        return (bmi_tcp_errno_to_pvfs(-EINVAL));
2240    }
2241
2242    /* make a socket */
2243    if ((tcp_addr_data->socket = BMI_sockio_new_sock()) < 0)
2244    {
2245        tmp_errno = WSAGetLastError();
2246        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2247    }
2248
2249    /* set it to non-blocking operation */
2250    /* oldfl = fcntl(tcp_addr_data->socket, F_GETFL, 0);
2251    if (!(oldfl & O_NONBLOCK))
2252    {
2253        fcntl(tcp_addr_data->socket, F_SETFL, oldfl | O_NONBLOCK);
2254    }*/
2255    SET_NONBLOCK(tcp_addr_data->socket);
2256
2257#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
2258    /* make sure if we need to bind or not to some local port ranges */
2259    tcp_enable_trusted(tcp_addr_data);
2260#endif
2261
2262    /* turn off Nagle's algorithm */
2263    if (BMI_sockio_set_tcpopt(tcp_addr_data->socket, TCP_NODELAY, 1) < 0)
2264    {
2265        tmp_errno = WSAGetLastError();
2266        gossip_lerr("Error: failed to set TCP_NODELAY option.\n");
2267        closesocket(tcp_addr_data->socket);
2268        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2269    }
2270
2271    bmi_set_sock_buffers(tcp_addr_data->socket);
2272
2273    if (tcp_addr_data->hostname)
2274    {
2275        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
2276                      "Connect: socket=%d, hostname=%s, port=%d\n",
2277                      tcp_addr_data->socket, tcp_addr_data->hostname,
2278                      tcp_addr_data->port);
2279        ret = BMI_sockio_connect_sock(tcp_addr_data->socket,
2280                      tcp_addr_data->hostname,
2281                      tcp_addr_data->port);
2282    }
2283    else
2284    {
2285        return (bmi_tcp_errno_to_pvfs(-EINVAL));
2286    }
2287
2288    if (ret < 0)
2289    {
2290        if (ret == -EINPROGRESS)
2291        {
2292            tcp_addr_data->not_connected = 1;
2293            /* this will have to be connected later with a poll */
2294        }
2295        else
2296        {
2297            /* NOTE: BMI_sockio_connect_sock returns a PVFS error */
2298            char buff[300];
2299
2300            _snprintf(buff, 300, "Error: BMI_sockio_connect_sock: (%s):",
2301                      tcp_addr_data->hostname);
2302
2303            PVFS_perror_gossip(buff, ret);
2304            return (ret);
2305        }
2306    }
2307
2308    return (0);
2309}
2310
2311
2312/* enqueue_operation()
2313 *
2314 * creates a new operation based on the arguments to the function.  It
2315 * then makes sure that the address is added to the socket collection,
2316 * and the operation is added to the appropriate operation queue.
2317 *
2318 * Damn, what a big prototype!
2319 *
2320 * returns 0 on success, -errno on failure
2321 */
2322static int enqueue_operation(op_list_p target_list,
2323                             enum bmi_op_type send_recv,
2324                             bmi_method_addr_p map,
2325                             void *const *buffer_list,
2326                             const bmi_size_t *size_list,
2327                             int list_count,
2328                             bmi_size_t amt_complete,
2329                             bmi_size_t env_amt_complete,
2330                             bmi_op_id_t * id,
2331                             int tcp_op_state,
2332                             struct tcp_msg_header header,
2333                             void *user_ptr,
2334                             bmi_size_t actual_size,
2335                             bmi_size_t expected_size,
2336                             bmi_context_id context_id,
2337                             int32_t eid)
2338{
2339    method_op_p new_method_op = NULL;
2340    struct tcp_op *tcp_op_data = NULL;
2341    struct tcp_addr* tcp_addr_data = NULL;
2342    int i;
2343
2344    /* allocate the operation structure */
2345    new_method_op = alloc_tcp_method_op();
2346    if (!new_method_op)
2347    {
2348        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
2349    }
2350
2351    *id = new_method_op->op_id;
2352    new_method_op->event_id = eid;
2353
2354    /* set the fields */
2355    new_method_op->send_recv = send_recv;
2356    new_method_op->addr = map;
2357    new_method_op->user_ptr = user_ptr;
2358    /* this is on purpose; we want to use the buffer_list all of
2359     * the time, no special case for one contig buffer
2360     */
2361    new_method_op->buffer = NULL;
2362    new_method_op->actual_size = actual_size;
2363    new_method_op->expected_size = expected_size;
2364    new_method_op->send_recv = send_recv;
2365    new_method_op->amt_complete = amt_complete;
2366    new_method_op->env_amt_complete = env_amt_complete;
2367    new_method_op->msg_tag = header.tag;
2368    new_method_op->mode = header.mode;
2369    new_method_op->list_count = list_count;
2370    new_method_op->context_id = context_id;
2371
2372    /* set our current position in list processing */
2373    i=0;
2374    new_method_op->list_index = 0;
2375    new_method_op->cur_index_complete = 0;
2376    while(amt_complete > 0)
2377    {
2378        if(amt_complete >= size_list[i])
2379        {
2380            amt_complete -= size_list[i];
2381            new_method_op->list_index++;
2382            i++;
2383        }
2384        else
2385        {
2386            new_method_op->cur_index_complete = amt_complete;
2387            amt_complete = 0;
2388        }
2389    }
2390
2391    tcp_op_data = (struct tcp_op *) new_method_op->method_data;
2392    tcp_op_data->tcp_op_state = (enum bmi_tcp_state) tcp_op_state;
2393    tcp_op_data->env = header;
2394
2395    /* if there is only one item in the list, then keep the list stored
2396     * in the op structure.  This allows us to use the same code for send
2397     * and recv as we use for send_list and recv_list, without having to
2398     * malloc lists for those special cases
2399     */
2400    if (list_count == 1)
2401    {
2402        new_method_op->buffer_list = &tcp_op_data->buffer_list_stub;
2403        new_method_op->size_list = &tcp_op_data->size_list_stub;
2404        ((void**)new_method_op->buffer_list)[0] = buffer_list[0];
2405        ((bmi_size_t*)new_method_op->size_list)[0] = size_list[0];
2406    }
2407    else
2408    {
2409        new_method_op->size_list = size_list;
2410        new_method_op->buffer_list = buffer_list;
2411    }
2412
2413    tcp_addr_data = (struct tcp_addr *) map->method_data;
2414
2415    if(tcp_addr_data->addr_error)
2416    {
2417        /* server should always fail here, client should let receives queue
2418         * as if nothing were wrong
2419         */
2420        if(tcp_addr_data->dont_reconnect || send_recv == BMI_SEND)
2421        {
2422            gossip_debug(GOSSIP_BMI_DEBUG_TCP,
2423                       "Warning: BMI communication attempted on an "
2424                       "address in failure mode.\n");
2425            new_method_op->error_code = tcp_addr_data->addr_error;
2426            op_list_add(op_list_array[new_method_op->context_id],
2427                        new_method_op);
2428            return(tcp_addr_data->addr_error);
2429        }
2430    }
2431
2432#if 0
2433    if(tcp_addr_data->addr_error)
2434    {
2435        /* this address is bad, don't try to do anything with it */
2436        gossip_err("Warning: BMI communication attempted on an "
2437                   "address in failure mode.\n");
2438
2439        new_method_op->error_code = tcp_addr_data->addr_error;
2440        op_list_add(op_list_array[new_method_op->context_id],
2441                    new_method_op);
2442        return(tcp_addr_data->addr_error);
2443    }
2444#endif
2445
2446    /* add the socket to poll on */
2447    BMI_socket_collection_add(tcp_socket_collection_p, map);
2448    if(send_recv == BMI_SEND)
2449    {
2450        BMI_socket_collection_add_write_bit(tcp_socket_collection_p, map);
2451    }
2452
2453    /* keep up with the operation */
2454    op_list_add(target_list, new_method_op);
2455
2456    return (0);
2457}
2458
2459
2460/* tcp_post_recv_generic()
2461 *
2462 * does the real work of posting an operation - works for both
2463 * eager and rendezvous messages
2464 *
2465 * returns 0 on success that requires later poll, returns 1 on instant
2466 * completion, -errno on failure
2467 */
2468static int tcp_post_recv_generic(bmi_op_id_t * id,
2469                                 bmi_method_addr_p src,
2470                                 void *const *buffer_list,
2471                                 const bmi_size_t *size_list,
2472                                 int list_count,
2473                                 bmi_size_t expected_size,
2474                                 bmi_size_t * actual_size,
2475                                 enum bmi_buffer_type buffer_type,
2476                                 bmi_msg_tag_t tag,
2477                                 void *user_ptr,
2478                                 bmi_context_id context_id,
2479                                 PVFS_hint hints)
2480{
2481    method_op_p query_op = NULL;
2482    int ret = -1;
2483    struct tcp_addr *tcp_addr_data = NULL;
2484    struct tcp_op *tcp_op_data = NULL;
2485    struct tcp_msg_header bogus_header;
2486    struct op_list_search_key key;
2487    bmi_size_t copy_size = 0;
2488    bmi_size_t total_copied = 0;
2489    int i;
2490    PINT_event_id eid = 0;
2491
2492    PINT_EVENT_START(
2493        bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, &eid,
2494        PINT_HINT_GET_CLIENT_ID(hints),
2495        PINT_HINT_GET_REQUEST_ID(hints),
2496        PINT_HINT_GET_RANK(hints),
2497        PINT_HINT_GET_HANDLE(hints),
2498        PINT_HINT_GET_OP_ID(hints),
2499        expected_size);
2500
2501    tcp_addr_data = (struct tcp_addr *) src->method_data;
2502
2503    /* short out immediately if the address is bad and we have no way to
2504     * reconnect
2505     */
2506    if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
2507    {
2508        gossip_debug(
2509            GOSSIP_BMI_DEBUG_TCP,
2510            "Warning: BMI communication attempted "
2511            "on an address in failure mode.\n");
2512        return(tcp_addr_data->addr_error);
2513    }
2514
2515    /* lets make sure that the message hasn't already been fully
2516     * buffered in eager mode before doing anything else
2517     */
2518    memset(&key, 0, sizeof(struct op_list_search_key));
2519    key.method_addr = src;
2520    key.method_addr_yes = 1;
2521    key.msg_tag = tag;
2522    key.msg_tag_yes = 1;
2523
2524    query_op =
2525        op_list_search(op_list_array[IND_RECV_EAGER_DONE_BUFFERING], &key);
2526    if (query_op)
2527    {
2528        /* make sure it isn't too big */
2529        if (query_op->actual_size > expected_size)
2530        {
2531            gossip_err("Error: message ordering violation;\n");
2532            gossip_err("Error: message too large for next buffer.\n");
2533            return (bmi_tcp_errno_to_pvfs(-EPROTO));
2534        }
2535
2536        /* whoohoo- it is already done! */
2537        /* copy buffer out to list segments; handle short case */
2538        for (i = 0; i < list_count; i++)
2539        {
2540            copy_size = size_list[i];
2541            if (copy_size + total_copied > query_op->actual_size)
2542            {
2543                copy_size = query_op->actual_size - total_copied;
2544            }
2545            memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
2546                                             total_copied), copy_size);
2547            total_copied += copy_size;
2548            if (total_copied == query_op->actual_size)
2549            {
2550                break;
2551            }
2552        }
2553        /* copy out to correct memory regions */
2554        (*actual_size) = query_op->actual_size;
2555        free(query_op->buffer);
2556        *id = 0;
2557        op_list_remove(query_op);
2558        dealloc_tcp_method_op(query_op);
2559        PINT_EVENT_END(bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid, 0,
2560                       *actual_size);
2561
2562        return (1);
2563    }
2564
2565    /* look for a message that is already being received */
2566    query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
2567    if (query_op)
2568    {
2569        tcp_op_data = (struct tcp_op *) query_op->method_data;
2570    }
2571
2572    /* see if it is being buffered into a temporary memory region */
2573    if (query_op && tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
2574    {
2575        /* make sure it isn't too big */
2576        if (query_op->actual_size > expected_size)
2577        {
2578            gossip_err("Error: message ordering violation;\n");
2579            gossip_err("Error: message too large for next buffer.\n");
2580            return (bmi_tcp_errno_to_pvfs(-EPROTO));
2581        }
2582
2583        /* copy what we have so far into the correct buffers */
2584        total_copied = 0;
2585        for (i = 0; i < list_count; i++)
2586        {
2587            copy_size = size_list[i];
2588            if (copy_size + total_copied > query_op->amt_complete)
2589            {
2590                copy_size = query_op->amt_complete - total_copied;
2591            }
2592            if (copy_size > 0)
2593            {
2594                memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
2595                                                 total_copied), copy_size);
2596            }
2597            total_copied += copy_size;
2598            if (total_copied == query_op->amt_complete)
2599            {
2600                query_op->list_index = i;
2601                query_op->cur_index_complete = copy_size;
2602                break;
2603            }
2604        }
2605
2606        /* see if we ended on a buffer boundary */
2607        if (query_op->cur_index_complete ==
2608            query_op->size_list[query_op->list_index])
2609        {
2610            query_op->list_index++;
2611            query_op->cur_index_complete = 0;
2612        }
2613
2614        /* release the old buffer */
2615        if (query_op->buffer)
2616        {
2617            free(query_op->buffer);
2618        }
2619
2620        *id = query_op->op_id;
2621        tcp_op_data = (struct tcp_op *) query_op->method_data;
2622        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
2623
2624        query_op->list_count = list_count;
2625        query_op->user_ptr = user_ptr;
2626        query_op->context_id = context_id;
2627        /* if there is only one item in the list, then keep the list stored
2628         * in the op structure.  This allows us to use the same code for send
2629         * and recv as we use for send_list and recv_list, without having to
2630         * malloc lists for those special cases
2631         */
2632        if (list_count == 1)
2633        {
2634            query_op->buffer_list = &tcp_op_data->buffer_list_stub;
2635            query_op->size_list = &tcp_op_data->size_list_stub;
2636            ((void **)query_op->buffer_list)[0] = buffer_list[0];
2637            ((bmi_size_t *)query_op->size_list)[0] = size_list[0];
2638        }
2639        else
2640        {
2641            query_op->buffer_list = buffer_list;
2642            query_op->size_list = size_list;
2643        }
2644
2645        if (query_op->amt_complete < query_op->actual_size)
2646        {
2647            /* try to recv some more data */
2648            tcp_addr_data = (struct tcp_addr *) query_op->addr->method_data;
2649            ret = payload_progress(tcp_addr_data->socket,
2650                                   query_op->buffer_list,
2651                                   query_op->size_list,
2652                                   query_op->list_count,
2653                                   query_op->actual_size,
2654                                   &(query_op->list_index),
2655                                   &(query_op->cur_index_complete),
2656                                   BMI_RECV,
2657                                   NULL,
2658                                   0);
2659            if (ret < 0)
2660            {
2661                PVFS_perror_gossip("Error: payload_progress", ret);
2662                /* payload_progress() returns BMI error codes */
2663                tcp_forget_addr(query_op->addr, 0, ret);
2664                return (ret);
2665            }
2666
2667            query_op->amt_complete += ret;
2668        }
2669        assert(query_op->amt_complete <= query_op->actual_size);
2670        if (query_op->amt_complete == query_op->actual_size)
2671        {
2672            /* we are done */
2673            op_list_remove(query_op);
2674            *id = 0;
2675            (*actual_size) = query_op->actual_size;
2676            dealloc_tcp_method_op(query_op);
2677            PINT_EVENT_END(
2678                bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid,
2679                0, *actual_size);
2680
2681            return (1);
2682        }
2683        else
2684        {
2685            /* there is still more work to do */
2686            tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
2687            return (0);
2688        }
2689    }
2690
2691    /* NOTE: if the message was in flight, but not buffering, then
2692     * that means that it has already matched an earlier receive
2693     * post or else is an unexpected message that doesn't require a
2694     * matching receive post - at any rate it shouldn't be handled
2695     * here
2696     */
2697
2698    /* if we hit this point we must enqueue */
2699    if (expected_size <= TCP_MODE_EAGER_LIMIT)
2700    {
2701        bogus_header.mode = TCP_MODE_EAGER;
2702    }
2703    else
2704    {
2705        bogus_header.mode = TCP_MODE_REND;
2706    }
2707    bogus_header.tag = tag;
2708    ret = enqueue_operation(op_list_array[IND_RECV],
2709                            BMI_RECV, src, buffer_list, size_list,
2710                            list_count, 0, 0, id, BMI_TCP_INPROGRESS,
2711                            bogus_header, user_ptr, 0,
2712                            expected_size, context_id, eid);
2713    /* just for safety; this field isn't valid to the caller anymore */
2714    (*actual_size) = 0;
2715    /* TODO: figure out why this causes deadlocks; observable in 2
2716     * scenarios:
2717     * - pvfs2-client-core with threaded library and nptl
2718     * - pvfs2-server threaded with nptl sending messages to itself
2719     */
2720#if 0
2721    if (ret >= 0)
2722    {
2723        /* go ahead and try to do some work while we are in this
2724         * function since we appear to be backlogged.  Make sure that
2725         * we do not wait in the poll, however.
2726         */
2727        ret = tcp_do_work(0);
2728    }
2729#endif
2730    return (ret);
2731}
2732
2733
2734/* tcp_cleanse_addr()
2735 *
2736 * finds all active operations matching the given address, places them
2737 * in an error state, and moves them to the completed queue.
2738 *
2739 * NOTE: this function does not shut down the address.  That should be
2740 * handled separately
2741 *
2742 * returns 0 on success, -errno on failure
2743 */
2744static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code)
2745{
2746    int i = 0;
2747    struct op_list_search_key key;
2748    method_op_p query_op = NULL;
2749
2750    memset(&key, 0, sizeof(struct op_list_search_key));
2751    key.method_addr = map;
2752    key.method_addr_yes = 1;
2753
2754    /* NOTE: we know the unexpected completed queue is the last index! */
2755    for (i = 0; i < (NUM_INDICES - 1); i++)
2756    {
2757        if (op_list_array[i])
2758        {
2759            while ((query_op = op_list_search(op_list_array[i], &key)))
2760            {
2761                op_list_remove(query_op);
2762                query_op->error_code = error_code;
2763                if (query_op->mode == TCP_MODE_UNEXP && query_op->send_recv
2764                    == BMI_RECV)
2765                {
2766                    op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP],
2767                                query_op);
2768                }
2769                else
2770                {
2771                    ((struct tcp_op*)(query_op->method_data))->tcp_op_state =
2772                        BMI_TCP_COMPLETE;
2773                    op_list_add(completion_array[query_op->context_id], query_op);
2774                }
2775            }
2776        }
2777    }
2778
2779    return (0);
2780}
2781
2782
2783/* tcp_shutdown_addr()
2784 *
2785 * closes connections associated with a tcp method address
2786 *
2787 * returns 0 on success, -errno on failure
2788 */
2789static int tcp_shutdown_addr(bmi_method_addr_p map)
2790{
2791
2792    struct tcp_addr *tcp_addr_data = (struct tcp_addr *) map->method_data;
2793    if (tcp_addr_data->socket > -1)
2794    {
2795        closesocket(tcp_addr_data->socket);
2796    }
2797    tcp_addr_data->socket = -1;
2798    tcp_addr_data->not_connected = 1;
2799
2800    return (0);
2801}
2802
2803
2804/* tcp_do_work()
2805 *
2806 * this is the function that actually does communication work during
2807 * BMI_tcp_testXXX and BMI_tcp_waitXXX functions.  The amount of work
2808 * that it does is tunable.
2809 *
2810 * returns 0 on success, -errno on failure.
2811 */
2812static int tcp_do_work(int max_idle_time)
2813{
2814    int ret = -1;
2815    bmi_method_addr_p addr_array[TCP_WORK_METRIC];
2816    int status_array[TCP_WORK_METRIC];
2817    int socket_count = 0;
2818    int i = 0;
2819    int stall_flag = 0;
2820    int busy_flag = 1;
2821    struct timespec req;
2822    struct tcp_addr* tcp_addr_data = NULL;
2823    struct timespec wait_time;
2824    struct timeval start;
2825
2826    if(sc_test_busy)
2827    {
2828        /* another thread is already polling or working on sockets */
2829        if(max_idle_time == 0)
2830        {
2831            /* we don't want to spend time waiting on it; return
2832             * immediately.
2833             */
2834            return(0);
2835        }
2836
2837        /* Sleep until working thread thread signals that it has finished
2838         * its work and then return.  No need for this thread to poll;
2839         * the other thread may have already finished what we wanted.
2840         * This condition wait is used strictly as a best effort to
2841         * prevent busy spin.  We'll sort out the results later.
2842         */
2843        gettimeofday(&start, NULL);
2844        wait_time.tv_sec = start.tv_sec + max_idle_time / 1000;
2845        wait_time.tv_nsec = (start.tv_usec + ((max_idle_time % 1000)*1000))*1000;
2846        if (wait_time.tv_nsec > 1000000000)
2847        {
2848            wait_time.tv_nsec = wait_time.tv_nsec - 1000000000;
2849            wait_time.tv_sec++;
2850        }
2851        gen_cond_timedwait(&interface_cond, &interface_mutex, &wait_time);
2852        return(0);
2853    }
2854
2855    /* this thread has gained control of the polling.  */
2856    sc_test_busy = 1;
2857    gen_mutex_unlock(&interface_mutex);
2858
2859    /* our turn to look at the socket collection */
2860    ret = BMI_socket_collection_testglobal(tcp_socket_collection_p,
2861                                       TCP_WORK_METRIC, &socket_count,
2862                                       addr_array, status_array,
2863                                       max_idle_time);
2864
2865    gen_mutex_lock(&interface_mutex);
2866    sc_test_busy = 0;
2867
2868    if (ret < 0)
2869    {
2870        /* wake up anyone else who might have been waiting */
2871        gen_cond_broadcast(&interface_cond);
2872        PVFS_perror_gossip("Error: socket collection:", ret);
2873        /* BMI_socket_collection_testglobal() returns BMI error code */
2874        return (ret);
2875    }
2876
2877    if(socket_count == 0)
2878        busy_flag = 0;
2879
2880    /* do different kinds of work depending on results */
2881    for (i = 0; i < socket_count; i++)
2882    {
2883        tcp_addr_data = (struct tcp_addr *) addr_array[i]->method_data;
2884        /* skip working on addresses in failure mode */
2885        if(tcp_addr_data->addr_error)
2886        {
2887            /* addr_error field is in BMI error code format */
2888            tcp_forget_addr(addr_array[i], 0, tcp_addr_data->addr_error);
2889            continue;
2890        }
2891
2892        if (status_array[i] & SC_ERROR_BIT)
2893        {
2894            ret = tcp_do_work_error(addr_array[i]);
2895            if (ret < 0)
2896            {
2897                PVFS_perror_gossip("Warning: BMI error handling failure, continuing", ret);
2898            }
2899        }
2900        else
2901        {
2902            if (status_array[i] & SC_WRITE_BIT)
2903            {
2904                ret = tcp_do_work_send(addr_array[i], &stall_flag);
2905                if (ret < 0)
2906                {
2907                    PVFS_perror_gossip("Warning: BMI send error, continuing", ret);
2908                }
2909                if(!stall_flag)
2910                    busy_flag = 0;
2911            }
2912            if (status_array[i] & SC_READ_BIT)
2913            {
2914                ret = tcp_do_work_recv(addr_array[i], &stall_flag);
2915                if (ret < 0)
2916                {
2917                    PVFS_perror_gossip("Warning: BMI recv error, continuing", ret);
2918                }
2919                if(!stall_flag)
2920                    busy_flag = 0;
2921            }
2922        }
2923    }
2924
2925    /* IMPORTANT NOTE: if we have set the following flag, then it indicates that
2926     * poll() is finding data on our sockets, yet we are not able to move
2927     * any of it right now.  This means that the sockets are backlogged, and
2928     * BMI is in danger of busy spinning during test functions.  Let's sleep
2929     * for a millisecond here in hopes of letting the rest of the system
2930     * catch up somehow (either by clearing a backlog in another I/O
2931     * component, or by posting more matching BMI recieve operations)
2932     */
2933    if(busy_flag)
2934    {
2935        /* req.tv_sec = 0;
2936        req.tv_nsec = 1000; */
2937        gen_mutex_unlock(&interface_mutex);
2938        /* nanosleep(&req, NULL); */
2939        Sleep(1);
2940        gen_mutex_lock(&interface_mutex);
2941    }
2942
2943    /* wake up anyone else who might have been waiting */
2944    gen_cond_broadcast(&interface_cond);
2945    return (0);
2946}
2947
2948
2949/* tcp_do_work_send()
2950 *
2951 * does work on a TCP address that is ready to send data.
2952 *
2953 * returns 0 on success, -errno on failure
2954 */
2955static int tcp_do_work_send(bmi_method_addr_p map, int* stall_flag)
2956{
2957    method_op_p active_method_op = NULL;
2958    struct op_list_search_key key;
2959    int blocked_flag = 0;
2960    int ret = 0;
2961    int tmp_stall_flag;
2962
2963    *stall_flag = 1;
2964
2965    while (blocked_flag == 0 && ret == 0)
2966    {
2967        /* what we want to do here is find the first operation in the send
2968         * queue for this address.
2969         */
2970        memset(&key, 0, sizeof(struct op_list_search_key));
2971        key.method_addr = map;
2972        key.method_addr_yes = 1;
2973        active_method_op = op_list_search(op_list_array[IND_SEND], &key);
2974        if (!active_method_op)
2975        {
2976            /* ran out of queued sends to work on */
2977            return (0);
2978        }
2979
2980        ret = work_on_send_op(active_method_op, &blocked_flag, &tmp_stall_flag);
2981        if(!tmp_stall_flag)
2982            *stall_flag = 0;
2983    }
2984
2985    return (ret);
2986}
2987
2988
2989/* handle_new_connection()
2990 *
2991 * this function should be called only on special tcp method addresses
2992 * that represent local server ports.  It will attempt to accept a new
2993 * connection and create a new method address for the remote host.
2994 *
2995 * side effect: destroys the temporary method_address that is passed in
2996 * to it.
2997 *
2998 * returns 0 on success, -errno on failure
2999 */
3000static int handle_new_connection(bmi_method_addr_p map)
3001{
3002    struct tcp_addr *tcp_addr_data = NULL;
3003    int accepted_socket = -1;
3004    bmi_method_addr_p new_addr = NULL;
3005    int ret = -1;
3006    char* tmp_peer = NULL;
3007
3008    ret = tcp_accept_init(&accepted_socket, &tmp_peer);
3009    if (ret < 0)
3010    {
3011        return (ret);
3012    }
3013    if (accepted_socket < 0)
3014    {
3015        /* guess it wasn't ready after all */
3016        return (0);
3017    }
3018
3019    /* ok, we have a new socket.  what now?  Probably simplest
3020     * thing to do is to create a new method_addr, add it to the
3021     * socket collection, and return.  It will get caught the next
3022     * time around */
3023    new_addr = alloc_tcp_method_addr();
3024    if (!new_addr)
3025    {
3026        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3027    }
3028    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
3029                  "Assigning socket %d to new method addr.\n",
3030                  accepted_socket);
3031    tcp_addr_data = (struct tcp_addr *) new_addr->method_data;
3032    tcp_addr_data->socket = accepted_socket;
3033    tcp_addr_data->peer = tmp_peer;
3034    tcp_addr_data->peer_type = BMI_TCP_PEER_IP;
3035
3036    /* set a flag to make sure that we never try to reconnect this address
3037     * in the future
3038     */
3039    tcp_addr_data->dont_reconnect = 1;
3040    /* register this address with the method control layer */
3041    tcp_addr_data->bmi_addr = bmi_method_addr_reg_callback(new_addr);
3042    if (ret < 0)
3043    {
3044        tcp_shutdown_addr(new_addr);
3045        dealloc_tcp_method_addr(new_addr);
3046        dealloc_tcp_method_addr(map);
3047        return (ret);
3048    }
3049    BMI_socket_collection_add(tcp_socket_collection_p, new_addr);
3050
3051    dealloc_tcp_method_addr(map);
3052    return (0);
3053
3054}
3055
3056
3057/* tcp_do_work_recv()
3058 *
3059 * does work on a TCP address that is ready to recv data.
3060 *
3061 * returns 0 on success, -errno on failure
3062 */
3063static int tcp_do_work_recv(bmi_method_addr_p map, int* stall_flag)
3064{
3065
3066    method_op_p active_method_op = NULL;
3067    int ret = -1;
3068    void *new_buffer = NULL;
3069    struct op_list_search_key key;
3070    struct tcp_msg_header new_header;
3071    struct tcp_addr *tcp_addr_data = (struct tcp_addr *) map->method_data;
3072    struct tcp_op *tcp_op_data = NULL;
3073    int tmp_errno;
3074    int tmp;
3075    bmi_size_t old_amt_complete = 0;
3076    time_t current_time;
3077
3078    *stall_flag = 1;
3079
3080    /* figure out if this is a new connection */
3081    if (tcp_addr_data->server_port)
3082    {
3083        /* just try to accept connection- no work yet */
3084        *stall_flag = 0;
3085        return (handle_new_connection(map));
3086    }
3087
3088    /* look for a recv for this address that is already in flight */
3089    active_method_op = find_recv_inflight(map);
3090    /* see if we found one in progress... */
3091    if (active_method_op)
3092    {
3093        tcp_op_data = (struct tcp_op *) active_method_op->method_data;
3094        if (active_method_op->mode == TCP_MODE_REND &&
3095            tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
3096        {
3097            /* we must wait for recv post */
3098            return (0);
3099        }
3100        else
3101        {
3102            old_amt_complete = active_method_op->amt_complete;
3103            ret = work_on_recv_op(active_method_op, stall_flag);
3104            gossip_debug(GOSSIP_BMI_DEBUG_TCP, "actual_size=%d, "
3105                         "amt_complete=%d, old_amt_complete=%d\n",
3106                         (int)active_method_op->actual_size,
3107                         (int)active_method_op->amt_complete,
3108                         (int)old_amt_complete);
3109
3110            if ((ret == 0) &&
3111                (old_amt_complete == active_method_op->amt_complete) &&
3112                active_method_op->actual_size &&
3113                (active_method_op->amt_complete <
3114                 active_method_op->actual_size))
3115            {
3116                gossip_debug(
3117                    GOSSIP_BMI_DEBUG_TCP, "Warning: bmi_tcp unable "
3118                    "to recv any data reported by poll(). [1]\n");
3119
3120                if (tcp_addr_data->zero_read_limit++ ==
3121                    BMI_TCP_ZERO_READ_LIMIT)
3122                {
3123                    gossip_debug(GOSSIP_BMI_DEBUG_TCP,
3124                                 "...dropping connection.\n");
3125                    tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3126                }
3127            }
3128            else
3129            {
3130                tcp_addr_data->zero_read_limit = 0;
3131            }
3132            return(ret);
3133        }
3134    }
3135
3136    /* let's see if a the entire header is ready to be received.  If so
3137     * we will go ahead and pull it.  Otherwise, we will try again later.
3138     * It isn't worth the complication of reading only a partial message
3139     * header - we really want it atomically
3140     */
3141    ret = BMI_sockio_nbpeek(tcp_addr_data->socket,
3142                            new_header.enc_hdr, TCP_ENC_HDR_SIZE);
3143    if (ret < 0)
3144    {
3145        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-WSAGetLastError()));
3146        return (0);
3147    }
3148
3149    if (ret == 0)
3150    {
3151        gossip_debug(
3152            GOSSIP_BMI_DEBUG_TCP, "Warning: bmi_tcp unable "
3153            "to recv any data reported by poll(). [2]\n");
3154
3155        if (tcp_addr_data->zero_read_limit++ ==
3156            BMI_TCP_ZERO_READ_LIMIT)
3157        {
3158            gossip_debug(GOSSIP_BMI_DEBUG_TCP,
3159                         "...dropping connection.\n");
3160            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3161        }
3162        return(0);
3163    }
3164    else
3165    {
3166        tcp_addr_data->zero_read_limit = 0;
3167    }
3168
3169    if (ret < TCP_ENC_HDR_SIZE)
3170    {
3171        current_time = time(NULL);
3172        if(!tcp_addr_data->short_header_timer)
3173        {
3174            tcp_addr_data->short_header_timer = current_time;
3175        }
3176        else if((current_time - tcp_addr_data->short_header_timer) >
3177            BMI_TCP_HEADER_WAIT_SECONDS)
3178        {
3179            gossip_err("Error: incomplete BMI TCP header after %d seconds, closing connection.\n",
3180                BMI_TCP_HEADER_WAIT_SECONDS);
3181            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3182            return (0);
3183        }
3184
3185        /* header not ready yet, but we will keep hoping */
3186        return (0);
3187    }
3188
3189    tcp_addr_data->short_header_timer = 0;
3190    *stall_flag = 0;
3191    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Reading header for new op.\n");
3192    ret = BMI_sockio_nbrecv(tcp_addr_data->socket,
3193                           new_header.enc_hdr, TCP_ENC_HDR_SIZE);
3194    if (ret < TCP_ENC_HDR_SIZE)
3195    {
3196        tmp_errno = WSAGetLastError();
3197        gossip_err("Error: BMI_sockio_nbrecv: %d\n", tmp_errno);
3198        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-tmp_errno));
3199        return (0);
3200    }
3201
3202    /* decode the header */
3203    BMI_TCP_DEC_HDR(new_header);
3204
3205    /* so we have the header. now what?  These are the possible
3206     * scenarios:
3207     * a) unexpected message
3208     * b) eager message for which a recv has been posted
3209     * c) eager message for which a recv has not been posted
3210     * d) rendezvous messsage for which a recv has been posted
3211     * e) rendezvous messsage for which a recv has not been posted
3212     * f) eager message for which a rend. recv has been posted
3213     */
3214
3215    /* check magic number of message */
3216    if(new_header.magic_nr != BMI_MAGIC_NR)
3217    {
3218        gossip_err("Error: bad magic in BMI TCP message.\n");
3219        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EBADMSG));
3220        return(0);
3221    }
3222
3223    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Received new message; mode: %d.\n",
3224                  (int) new_header.mode);
3225    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "tag: %d\n", (int) new_header.tag);
3226
3227    if (new_header.mode == TCP_MODE_UNEXP)
3228    {
3229        /* allocate the operation structure */
3230        active_method_op = alloc_tcp_method_op();
3231        if (!active_method_op)
3232        {
3233            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3234            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3235        }
3236        /* create data buffer */
3237        new_buffer = malloc(new_header.size);
3238        if (!new_buffer)
3239        {
3240            dealloc_tcp_method_op(active_method_op);
3241            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3242            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3243        }
3244
3245        /* set the fields */
3246        active_method_op->send_recv = BMI_RECV;
3247        active_method_op->addr = map;
3248        active_method_op->actual_size = new_header.size;
3249        active_method_op->expected_size = 0;
3250        active_method_op->amt_complete = 0;
3251        active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3252        active_method_op->msg_tag = new_header.tag;
3253        active_method_op->buffer = new_buffer;
3254        active_method_op->mode = TCP_MODE_UNEXP;
3255        active_method_op->buffer_list = &(active_method_op->buffer);
3256        active_method_op->size_list = &(active_method_op->actual_size);
3257        active_method_op->list_count = 1;
3258        tcp_op_data = (struct tcp_op *) active_method_op->method_data;
3259        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3260        tcp_op_data->env = new_header;
3261
3262        op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3263        /* grab some data if we can */
3264        return (work_on_recv_op(active_method_op, &tmp));
3265    }
3266
3267    memset(&key, 0, sizeof(struct op_list_search_key));
3268    key.method_addr = map;
3269    key.method_addr_yes = 1;
3270    key.msg_tag = new_header.tag;
3271    key.msg_tag_yes = 1;
3272
3273    /* look for a match within the posted operations */
3274    active_method_op = op_list_search(op_list_array[IND_RECV], &key);
3275
3276    if (active_method_op)
3277    {
3278        /* make sure it isn't too big */
3279        if (new_header.size > active_method_op->expected_size)
3280        {
3281            gossip_err("Error: message ordering violation;\n");
3282            gossip_err("Error: message too large for next buffer.\n");
3283            gossip_err("Error: incoming size: %ld, expected size: %ld\n",
3284                        (long) new_header.size,
3285                        (long) active_method_op->expected_size);
3286            /* TODO: return error here or do something else? */
3287            return (bmi_tcp_errno_to_pvfs(-EPROTO));
3288        }
3289
3290        /* we found a match.  go work on it and return */
3291        op_list_remove(active_method_op);
3292        active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3293        active_method_op->actual_size = new_header.size;
3294        op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3295        return (work_on_recv_op(active_method_op, &tmp));
3296    }
3297
3298    /* no match anywhere.  Start a new operation */
3299    /* allocate the operation structure */
3300    active_method_op = alloc_tcp_method_op();
3301    if (!active_method_op)
3302    {
3303        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3304        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3305    }
3306
3307    if (new_header.mode == TCP_MODE_EAGER)
3308    {
3309        /* create data buffer for eager messages */
3310        new_buffer = malloc(new_header.size);
3311        if (!new_buffer)
3312        {
3313            dealloc_tcp_method_op(active_method_op);
3314            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3315            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3316        }
3317    }
3318    else
3319    {
3320        new_buffer = NULL;
3321    }
3322
3323    /* set the fields */
3324    active_method_op->send_recv = BMI_RECV;
3325    active_method_op->addr = map;
3326    active_method_op->actual_size = new_header.size;
3327    active_method_op->expected_size = 0;
3328    active_method_op->amt_complete = 0;
3329    active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3330    active_method_op->msg_tag = new_header.tag;
3331    active_method_op->buffer = new_buffer;
3332    active_method_op->mode = new_header.mode;
3333    active_method_op->buffer_list = &(active_method_op->buffer);
3334    active_method_op->size_list = &(active_method_op->actual_size);
3335    active_method_op->list_count = 1;
3336    tcp_op_data = (struct tcp_op *) active_method_op->method_data;
3337    tcp_op_data->tcp_op_state = BMI_TCP_BUFFERING;
3338    tcp_op_data->env = new_header;
3339
3340    op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3341
3342    /* grab some data if we can */
3343    if (new_header.mode == TCP_MODE_EAGER)
3344    {
3345        return (work_on_recv_op(active_method_op, &tmp));
3346    }
3347
3348    return (0);
3349}
3350
3351
3352/*
3353 * work_on_send_op()
3354 *
3355 * used to perform work on a send operation.  this is called by the poll
3356 * function.
3357 *
3358 * sets blocked_flag if no more work can be done on socket without
3359 * blocking
3360 * returns 0 on success, -errno on failure.
3361 */
3362static int work_on_send_op(method_op_p my_method_op,
3363                           int *blocked_flag, int* stall_flag)
3364{
3365    int ret = -1;
3366    struct tcp_addr *tcp_addr_data = (struct tcp_addr *) my_method_op->addr->method_data;
3367    struct tcp_op *tcp_op_data = (struct tcp_op *) my_method_op->method_data;
3368
3369    *blocked_flag = 1;
3370    *stall_flag = 0;
3371
3372    /* make sure that the connection is done before we continue */
3373    if (tcp_addr_data->not_connected)
3374    {
3375        ret = tcp_sock_init(my_method_op->addr);
3376        if (ret < 0)
3377        {
3378            PVFS_perror_gossip("Error: socket failed to init", ret);
3379            /* tcp_sock_init() returns BMI error code */
3380            tcp_forget_addr(my_method_op->addr, 0, ret);
3381            return (0);
3382        }
3383        if (tcp_addr_data->not_connected)
3384        {
3385            /* try again later- still could not connect */
3386            tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3387            return (0);
3388        }
3389    }
3390
3391    ret = payload_progress(tcp_addr_data->socket,
3392        my_method_op->buffer_list,
3393        my_method_op->size_list,
3394        my_method_op->list_count,
3395        my_method_op->actual_size,
3396        &(my_method_op->list_index),
3397        &(my_method_op->cur_index_complete),
3398        BMI_SEND,
3399        tcp_op_data->env.enc_hdr,
3400        &my_method_op->env_amt_complete);
3401    if (ret < 0)
3402    {
3403        PVFS_perror_gossip("Error: payload_progress", ret);
3404        /* payload_progress() returns BMI error codes */
3405        tcp_forget_addr(my_method_op->addr, 0, ret);
3406        return (0);
3407    }
3408
3409    if(ret == 0)
3410        *stall_flag = 1;
3411
3412    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
3413    my_method_op->amt_complete += ret;
3414    assert(my_method_op->amt_complete <= my_method_op->actual_size);
3415
3416    if (my_method_op->amt_complete == my_method_op->actual_size && my_method_op->env_amt_complete == TCP_ENC_HDR_SIZE)
3417    {
3418        /* we are done */
3419        my_method_op->error_code = 0;
3420        BMI_socket_collection_remove_write_bit(tcp_socket_collection_p,
3421                                           my_method_op->addr);
3422        op_list_remove(my_method_op);
3423        ((struct tcp_op*)(my_method_op->method_data))->tcp_op_state =
3424            BMI_TCP_COMPLETE;
3425        op_list_add(completion_array[my_method_op->context_id], my_method_op);
3426        *blocked_flag = 0;
3427    }
3428    else
3429    {
3430        /* there is still more work to do */
3431        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3432    }
3433
3434    return (0);
3435}
3436
3437
3438/*
3439 * work_on_recv_op()
3440 *
3441 * used to perform work on a recv operation.  this is called by the poll
3442 * function.
3443 * NOTE: this function assumes the method header has already been read.
3444 *
3445 * returns 0 on success, -errno on failure.
3446 */
3447static int work_on_recv_op(method_op_p my_method_op, int* stall_flag)
3448{
3449
3450    int ret = -1;
3451    struct tcp_addr *tcp_addr_data = (struct tcp_addr *) my_method_op->addr->method_data;
3452    struct tcp_op *tcp_op_data = (struct tcp_op *) my_method_op->method_data;
3453
3454    *stall_flag = 1;
3455
3456    if (my_method_op->actual_size != 0)
3457    {
3458        /* now let's try to recv some actual data */
3459        ret = payload_progress(tcp_addr_data->socket,
3460            my_method_op->buffer_list,
3461            my_method_op->size_list,
3462            my_method_op->list_count,
3463            my_method_op->actual_size,
3464            &(my_method_op->list_index),
3465            &(my_method_op->cur_index_complete),
3466            BMI_RECV,
3467            NULL,
3468            0);
3469        if (ret < 0)
3470        {
3471            PVFS_perror_gossip("Error: payload_progress", ret);
3472            /* payload_progress() returns BMI error codes */
3473            tcp_forget_addr(my_method_op->addr, 0, ret);
3474            return (0);
3475        }
3476    }
3477    else
3478    {
3479        ret = 0;
3480    }
3481
3482    if(ret > 0)
3483        *stall_flag = 0;
3484
3485    my_method_op->amt_complete += ret;
3486    assert(my_method_op->amt_complete <= my_method_op->actual_size);
3487
3488    if (my_method_op->amt_complete == my_method_op->actual_size)
3489    {
3490        /* we are done */
3491        op_list_remove(my_method_op);
3492        if (tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
3493        {
3494            /* queue up to wait on matching post recv */
3495            op_list_add(op_list_array[IND_RECV_EAGER_DONE_BUFFERING],
3496                        my_method_op);
3497        }
3498        else
3499        {
3500            my_method_op->error_code = 0;
3501            if (my_method_op->mode == TCP_MODE_UNEXP)
3502            {
3503                op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP],
3504                            my_method_op);
3505            }
3506            else
3507            {
3508                ((struct tcp_op*)(my_method_op->method_data))->tcp_op_state =
3509                    BMI_TCP_COMPLETE;
3510                op_list_add(completion_array[my_method_op->context_id], my_method_op);
3511            }
3512        }
3513    }
3514
3515    return (0);
3516}
3517
3518
3519/* tcp_do_work_error()
3520 *
3521 * handles a tcp address that has indicated an error during polling.
3522 *
3523 * returns 0 on success, -errno on failure
3524 */
3525static int tcp_do_work_error(bmi_method_addr_p map)
3526{
3527    struct tcp_addr *tcp_addr_data = NULL;
3528    int buf;
3529    int ret;
3530    int tmp_errno;
3531
3532    tcp_addr_data = (struct tcp_addr *) map->method_data;
3533
3534    /* perform a read on the socket so that we can get a real errno */
3535    ret = recv(tcp_addr_data->socket, &buf, sizeof(int), 0);
3536    if (ret == 0)
3537        tmp_errno = EPIPE;  /* report other side closed socket with this */
3538    else
3539        tmp_errno = WSAGetLastError();
3540
3541    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Error: bmi_tcp: %d\n",
3542      tmp_errno);
3543
3544    if (tcp_addr_data->server_port)
3545    {
3546        /* Ignore this and hope it goes away... we don't want to lose
3547         * our local socket */
3548        dealloc_tcp_method_addr(map);
3549        gossip_lerr("Warning: error polling on server socket, continuing.\n");
3550        return (0);
3551    }
3552
3553    if(tmp_errno == 0)
3554        tmp_errno = EPROTO;
3555
3556    tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-tmp_errno));
3557
3558    return (0);
3559}
3560
3561#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
3562/*
3563 * tcp_enable_trusted()
3564 * Ideally, this function should look up the security configuration of
3565 * the server and determines
3566 * if it needs to bind to any specific port locally or not..
3567 * For now look at the FIXME below.
3568 */
3569static int tcp_enable_trusted(struct tcp_addr *tcp_addr_data)
3570{
3571    /*
3572     * FIXME:
3573     * For now, there is no way for us to check if a given
3574     * server is actually using port protection or not.
3575     * For now we unconditionally use a trusted port range
3576     * as long as USE_TRUSTED is #defined.
3577     *
3578     * Although most of the time we expect users
3579     * to be using a range of 0-1024, it is hard to keep probing
3580     * until one gets a port in the range specified.
3581     * Hence this is a temporary fix. we will see if this
3582     * requirement even needs to be met at all.
3583     */
3584    static unsigned short my_requested_port = 1023;
3585    unsigned short my_local_port = 0;
3586    struct sockaddr_in my_local_sockaddr;
3587    socklen_t len = sizeof(struct sockaddr_in);
3588    memset(&my_local_sockaddr, 0, sizeof(struct sockaddr_in));
3589
3590    /* setup for a fast restart to avoid bind addr in use errors */
3591    if (BMI_sockio_set_sockopt(tcp_addr_data->socket, SO_REUSEADDR, 1) < 0)
3592    {
3593        gossip_lerr("Could not set SO_REUSEADDR on local socket (port %hd)\n", my_local_port);
3594    }
3595    if (BMI_sockio_bind_sock(tcp_addr_data->socket, my_requested_port) < 0)
3596    {
3597        gossip_lerr("Could not bind to local port %hd: %s\n",
3598                my_requested_port, strerror(errno));
3599    }
3600    else {
3601        my_requested_port--;
3602    }
3603    my_local_sockaddr.sin_family = AF_INET;
3604    if (getsockname(tcp_addr_data->socket,
3605                (struct sockaddr *)&my_local_sockaddr, &len) == 0)
3606    {
3607        my_local_port = ntohs(my_local_sockaddr.sin_port);
3608    }
3609    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Bound locally to port: %hd\n", my_local_port);
3610    return 0;
3611}
3612
3613#endif
3614
3615#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
3616
3617static char *bad_errors[] = {
3618    "invalid network address",
3619    "invalid port",
3620    "invalid network address and port"
3621};
3622
3623/*
3624 * tcp_allow_trusted()
3625 * if trusted ports was enabled make sure
3626 * that we can accept a particular connection from a given
3627 * client
3628 */
3629static int tcp_allow_trusted(struct sockaddr_in *peer_sockaddr)
3630{
3631    char *peer_hostname = inet_ntoa(peer_sockaddr->sin_addr);
3632    unsigned short peer_port = ntohs(peer_sockaddr->sin_port);
3633    int   i, what_failed   = -1;
3634
3635    /* Don't refuse connects if there were any
3636     * parse errors or if it is not enabled in the config file
3637     */
3638    if (gtcp_allowed_connection->port_enforce == 0
3639            && gtcp_allowed_connection->network_enforce == 0)
3640    {
3641        return 0;
3642    }
3643    /* make sure that the client is within the allowed network */
3644    if (gtcp_allowed_connection->network_enforce == 1)
3645    {
3646        /* Always allow localhost to connect */
3647        if (ntohl(peer_sockaddr->sin_addr.s_addr) == INADDR_LOOPBACK)
3648        {
3649            goto port_check;
3650        }
3651        for (i = 0; i < gtcp_allowed_connection->network_count; i++)
3652        {
3653            /* check with all the masks */
3654            if ((peer_sockaddr->sin_addr.s_addr & gtcp_allowed_connection->netmask[i].s_addr)
3655                    != (gtcp_allowed_connection->network[i].s_addr & gtcp_allowed_connection->netmask[i].s_addr ))
3656            {
3657                continue;
3658            }
3659            else {
3660                goto port_check;
3661            }
3662        }
3663        /* not from a trusted network */
3664        what_failed = 0;
3665    }
3666port_check:
3667    /* make sure that the client port numbers are within specified limits */
3668    if (gtcp_allowed_connection->port_enforce == 1)
3669    {
3670        if (peer_port < gtcp_allowed_connection->ports[0]
3671                || peer_port > gtcp_allowed_connection->ports[1])
3672        {
3673            what_failed = (what_failed < 0) ? 1 : 2;
3674        }
3675    }
3676    /* okay, we are good to go */
3677    if (what_failed < 0)
3678    {
3679        return 0;
3680    }
3681    /* no good */
3682    gossip_err("Rejecting client %s on port %d: %s\n",
3683           peer_hostname, peer_port, bad_errors[what_failed]);
3684    return -1;
3685}
3686
3687#endif
3688
3689/*
3690 * tcp_accept_init()
3691 *
3692 * used to establish a connection from the server side.  Attempts an
3693 * accept call and provides the socket if it succeeds.
3694 *
3695 * returns 0 on success, -errno on failure.
3696 */
3697static int tcp_accept_init(int *socket, char** peer)
3698{
3699
3700    int ret = -1;
3701    int tmp_errno = 0;
3702    struct tcp_addr *tcp_addr_data = (struct tcp_addr *) tcp_method_params.listen_addr->method_data;
3703    int oldfl = 0;
3704    struct sockaddr_in peer_sockaddr;
3705    int peer_sockaddr_size = sizeof(struct sockaddr_in);
3706    char* tmp_peer;
3707
3708    /* do we have a socket on this end yet? */
3709    if (tcp_addr_data->socket < 0)
3710    {
3711        ret = tcp_server_init();
3712        if (ret < 0)
3713        {
3714            return (ret);
3715        }
3716    }
3717
3718    *socket = accept(tcp_addr_data->socket, (struct sockaddr*)&peer_sockaddr,
3719              (int *)&peer_sockaddr_size);
3720
3721    if (*socket < 0)
3722    {
3723        tmp_errno = WSAGetLastError();
3724        if ((tmp_errno == WSATRY_AGAIN) ||
3725            (tmp_errno == WSAEWOULDBLOCK) ||
3726            (tmp_errno == WSAENETDOWN) ||
3727            /* (tmp_errno == EPROTO) || */
3728            (tmp_errno == WSAENOPROTOOPT) ||
3729            /* (tmp_errno == EHOSTDOWN) || */
3730            /* (tmp_errno == ENONET) || */
3731            (tmp_errno == WSAEHOSTUNREACH) ||
3732            (tmp_errno == WSAEOPNOTSUPP) ||
3733            (tmp_errno == WSAENETUNREACH) ||
3734            /* (tmp_errno == WSAENFILE) || */
3735            (tmp_errno == WSAEMFILE))
3736        {
3737            /* try again later */
3738            if (tmp_errno == EMFILE)
3739            {
3740                gossip_err("Error: accept: %d (continuing)\n", tmp_errno);
3741                bmi_method_addr_drop_callback(BMI_tcp_method_name);
3742            }
3743            return (0);
3744        }
3745        else
3746        {
3747            gossip_err("Error: accept: %d\n", tmp_errno);
3748            return (bmi_tcp_errno_to_pvfs(-tmp_errno));
3749        }
3750    }
3751
3752#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
3753
3754    /* make sure that we are allowed to accept this connection */
3755    if (tcp_allow_trusted(&peer_sockaddr) < 0)
3756    {
3757        /* Force closure of the connection */
3758        close(*socket);
3759        return (bmi_tcp_errno_to_pvfs(-EACCES));
3760    }
3761
3762#endif
3763
3764    /* we accepted a new connection.  turn off Nagle's algorithm. */
3765    if (BMI_sockio_set_tcpopt(*socket, TCP_NODELAY, 1) < 0)
3766    {
3767        tmp_errno = WSAGetLastError();
3768        gossip_lerr("Error: failed to set TCP_NODELAY option.\n");
3769        closesocket(*socket);
3770        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
3771    }
3772
3773    /* set it to non-blocking operation */
3774    /*oldfl = fcntl(*socket, F_GETFL, 0);
3775    if (!(oldfl & O_NONBLOCK))
3776    {
3777        fcntl(*socket, F_SETFL, oldfl | O_NONBLOCK);
3778    }*/
3779    SET_NONBLOCK(*socket);
3780
3781    /* allocate ip address string */
3782    tmp_peer = inet_ntoa(peer_sockaddr.sin_addr);
3783    *peer = (char*)malloc(strlen(tmp_peer)+1);
3784    if(!(*peer))
3785    {
3786        closesocket(*socket);
3787        return(bmi_tcp_errno_to_pvfs(-BMI_ENOMEM));
3788    }
3789    strcpy(*peer, tmp_peer);
3790
3791    return (0);
3792}
3793
3794
3795/* alloc_tcp_method_op()
3796 *
3797 * creates a new method op with defaults filled in for tcp.
3798 *
3799 * returns pointer to structure on success, NULL on failure
3800 */
3801static method_op_p alloc_tcp_method_op(void)
3802{
3803    method_op_p my_method_op = NULL;
3804
3805    my_method_op = bmi_alloc_method_op(sizeof(struct tcp_op));
3806
3807    /* we trust alloc_method_op to zero it out */
3808
3809    return (my_method_op);
3810}
3811
3812
3813/* dealloc_tcp_method_op()
3814 *
3815 * destroys an existing tcp method op, freeing segment lists if
3816 * needed
3817 *
3818 * no return value
3819 */
3820static void dealloc_tcp_method_op(method_op_p old_op)
3821{
3822    bmi_dealloc_method_op(old_op);
3823    return;
3824}
3825
3826/* tcp_post_send_generic()
3827 *
3828 * Submits send operations (low level).
3829 *
3830 * returns 0 on success that requires later poll, returns 1 on instant
3831 * completion, -errno on failure
3832 */
3833static int tcp_post_send_generic(bmi_op_id_t * id,
3834                                 bmi_method_addr_p dest,
3835                                 const void *const *buffer_list,
3836                                 const bmi_size_t *size_list,
3837                                 int list_count,
3838                                 enum bmi_buffer_type buffer_type,
3839                                 struct tcp_msg_header my_header,
3840                                 void *user_ptr,
3841                                 bmi_context_id context_id,
3842                                 PVFS_hint hints)
3843{
3844    struct tcp_addr *tcp_addr_data = (struct tcp_addr *) dest->method_data;
3845    method_op_p query_op = NULL;
3846    int ret = -1;
3847    bmi_size_t total_size = 0;
3848    bmi_size_t amt_complete = 0;
3849    bmi_size_t env_amt_complete = 0;
3850    struct op_list_search_key key;
3851    int list_index = 0;
3852    bmi_size_t cur_index_complete = 0;
3853    PINT_event_id eid = 0;
3854
3855    if(PINT_EVENT_ENABLED)
3856    {
3857        int i = 0;
3858        for(; i < list_count; ++i)
3859        {
3860            total_size += size_list[i];
3861        }
3862    }
3863
3864    PINT_EVENT_START(
3865        bmi_tcp_send_event_id, bmi_tcp_pid, NULL, &eid,
3866        PINT_HINT_GET_CLIENT_ID(hints),
3867        PINT_HINT_GET_REQUEST_ID(hints),
3868        PINT_HINT_GET_RANK(hints),
3869        PINT_HINT_GET_HANDLE(hints),
3870        PINT_HINT_GET_OP_ID(hints),
3871        total_size);
3872
3873    /* Three things can happen here:
3874     * a) another op is already in queue for the address, so we just
3875     * queue up
3876     * b) we can send the whole message and return
3877     * c) we send part of the message and queue the rest
3878     */
3879
3880    /* NOTE: on the post_send side of an operation, it doesn't really
3881     * matter whether the op is going to be eager or rendezvous.  It is
3882     * handled the same way (except for how the header is filled in).
3883     * The difference is in the recv processing for TCP.
3884     */
3885
3886    /* NOTE: we also don't care what the buffer_type says, TCP could care
3887     * less what buffers it is using.
3888     */
3889
3890    /* encode the message header */
3891    BMI_TCP_ENC_HDR(my_header);
3892
3893    /* the first thing we must do is find out if another send is queued
3894     * up for this address so that we don't mess up our ordering.    */
3895    memset(&key, 0, sizeof(struct op_list_search_key));
3896    key.method_addr = dest;
3897    key.method_addr_yes = 1;
3898    query_op = op_list_search(op_list_array[IND_SEND], &key);
3899    if (query_op)
3900    {
3901        /* queue up operation */
3902        ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3903                                dest, (void **) buffer_list,
3904                                size_list, list_count, 0, 0,
3905                                id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3906                                my_header.size, 0,
3907                                context_id,
3908                                eid);
3909
3910        /* TODO: is this causing deadlocks?  See similar call in recv
3911         * path for another example.  This particular one seems to be an
3912         * issue under a heavy bonnie++ load that Neill has been
3913         * debugging.  Comment out for now to see if the problem goes
3914         * away.
3915         */
3916#if 0
3917        if (ret >= 0)
3918        {
3919            /* go ahead and try to do some work while we are in this
3920             * function since we appear to be backlogged.  Make sure that
3921             * we do not wait in the poll, however.
3922             */
3923            ret = tcp_do_work(0);
3924        }
3925#endif
3926        if (ret < 0)
3927        {
3928            gossip_err("Error: enqueue_operation() or tcp_do_work() returned: %d\n", ret);
3929        }
3930        return (ret);
3931    }
3932
3933    /* make sure the connection is established */
3934    ret = tcp_sock_init(dest);
3935    if (ret < 0)
3936    {
3937        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "tcp_sock_init() failure.\n");
3938        /* tcp_sock_init() returns BMI error code */
3939        tcp_forget_addr(dest, 0, ret);
3940        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, 0, ret);
3941        return (ret);
3942    }
3943
3944    tcp_addr_data = (struct tcp_addr *) dest->method_data;
3945
3946#if 0
3947    /* TODO: this is a hack for testing! */
3948    /* disables immediate send completion... */
3949    ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3950                            dest, buffer_list, size_list, list_count, 0, 0,
3951                            id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3952                            my_header.size, 0,
3953                            context_id);
3954    return(ret);
3955#endif
3956
3957    if (tcp_addr_data->not_connected)
3958    {
3959        /* if the connection is not completed, queue up for later work */
3960        ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3961                                dest, (void **) buffer_list, size_list,
3962                                list_count, 0, 0,
3963                                id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3964                                my_header.size, 0,
3965                                context_id,
3966                                eid);
3967        if(ret < 0)
3968        {
3969            gossip_err("Error: enqueue_operation() returned: %d\n", ret);
3970        }
3971        return (ret);
3972    }
3973
3974    /* try to send some data */
3975    env_amt_complete = 0;
3976    ret = payload_progress(tcp_addr_data->socket,
3977        (void **) buffer_list,
3978        size_list, list_count, my_header.size, &list_index,
3979        &cur_index_complete, BMI_SEND, my_header.enc_hdr, &env_amt_complete);
3980    if (ret < 0)
3981    {
3982        PVFS_perror_gossip("Error: payload_progress", ret);
3983        /* payload_progress() returns BMI error codes */
3984        tcp_forget_addr(dest, 0, ret);
3985        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, eid, 0, ret);
3986        return (ret);
3987    }
3988
3989    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
3990    amt_complete = ret;
3991    assert(amt_complete <= my_header.size);
3992    if (amt_complete == my_header.size && env_amt_complete == TCP_ENC_HDR_SIZE)
3993    {
3994        /* we are already done */
3995        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid,
3996                       NULL, eid, 0, amt_complete);
3997        return (1);
3998    }
3999
4000    /* queue up the remainder */
4001    ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
4002                            dest, (void **) buffer_list,
4003                            size_list, list_count,
4004                            amt_complete, env_amt_complete, id,
4005                            BMI_TCP_INPROGRESS, my_header, user_ptr,
4006                            my_header.size, 0, context_id, eid);
4007
4008    if(ret < 0)
4009    {
4010        gossip_err("Error: enqueue_operation() returned: %d\n", ret);
4011    }
4012    return (ret);
4013}
4014
4015
4016/* payload_progress()
4017 *
4018 * makes progress on sending/recving data payload portion of a message
4019 *
4020 * returns amount completed on success, -errno on failure
4021 */
4022static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
4023    size_list, int list_count, bmi_size_t total_size, int* list_index,
4024    bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
4025    char* enc_hdr, bmi_size_t* env_amt_complete)
4026{
4027    int i;
4028    int count = 0;
4029    int ret;
4030    int completed;
4031    /* used for finding the stopping point on short receives */
4032    int final_index = list_count-1;
4033    bmi_size_t final_size = size_list[list_count-1];
4034    bmi_size_t sum = 0;
4035    int vector_index = 0;
4036    int header_flag = 0;
4037    int tmp_env_done = 0;
4038
4039    if(send_recv == BMI_RECV)
4040    {
4041        /* find out if we should stop short in list processing */
4042        for(i=0; i<list_count; i++)
4043        {
4044            sum += size_list[i];
4045            if(sum >= total_size)
4046            {
4047                final_index = i;
4048                final_size = size_list[i] - (sum-total_size);
4049                break;
4050            }
4051        }
4052    }
4053
4054    assert(list_count > *list_index);
4055
4056    /* make sure we don't overrun our preallocated iovec array */
4057    if((list_count - (*list_index)) > BMI_TCP_IOV_COUNT)
4058    {
4059        list_count = (*list_index) + BMI_TCP_IOV_COUNT;
4060    }
4061
4062    /* do we need to send any of the header? */
4063    if(send_recv == BMI_SEND && *env_amt_complete < TCP_ENC_HDR_SIZE)
4064    {
4065        stat_io_vector[vector_index].buf = &enc_hdr[*env_amt_complete];
4066        stat_io_vector[vector_index].len = TCP_ENC_HDR_SIZE - *env_amt_complete;
4067        count++;
4068        vector_index++;
4069        header_flag = 1;
4070    }
4071
4072    /* setup vector */
4073    stat_io_vector[vector_index].buf =
4074        (char*)buffer_list[*list_index] + *current_index_complete;
4075    count++;
4076    if(final_index == 0)
4077    {
4078        stat_io_vector[vector_index].len = final_size - *current_index_complete;
4079    }
4080    else
4081    {
4082        stat_io_vector[vector_index].len =
4083            size_list[*list_index] - *current_index_complete;
4084        for(i = (*list_index + 1); i < list_count; i++)
4085        {
4086            vector_index++;
4087            count++;
4088            stat_io_vector[vector_index].buf = (CHAR *) buffer_list[i];
4089            if(i == final_index)
4090            {
4091                stat_io_vector[vector_index].len = final_size;
4092                break;
4093            }
4094            else
4095            {
4096                stat_io_vector[vector_index].len = size_list[i];
4097            }
4098        }
4099    }
4100
4101    assert(count > 0);
4102
4103    if(send_recv == BMI_RECV)
4104    {
4105        ret = BMI_sockio_nbvector(s, stat_io_vector, count, 1);
4106    }
4107    else
4108    {
4109        ret = BMI_sockio_nbvector(s, stat_io_vector, count, 0);
4110    }
4111
4112    /* if error or nothing done, return now */
4113    if(ret == 0)
4114        return(0);
4115    if(ret <= 0)
4116        return(bmi_tcp_errno_to_pvfs(-WSAGetLastError()));
4117
4118    completed = ret;
4119    if(header_flag && (completed >= 0))
4120    {
4121        /* take care of completed header status */
4122        tmp_env_done = TCP_ENC_HDR_SIZE - *env_amt_complete;
4123        if(tmp_env_done > completed)
4124            tmp_env_done = completed;
4125        completed -= tmp_env_done;
4126        ret -= tmp_env_done;
4127        (*env_amt_complete) += tmp_env_done;
4128    }
4129
4130    i=header_flag;
4131    while(completed > 0)
4132    {
4133        /* take care of completed data payload */
4134        if(completed >= stat_io_vector[i].len)
4135        {
4136            completed -= stat_io_vector[i].len;
4137            *current_index_complete = 0;
4138            (*list_index)++;
4139            i++;
4140        }
4141        else
4142        {
4143            *current_index_complete += completed;
4144            completed = 0;
4145        }
4146    }
4147
4148    return(ret);
4149}
4150
4151static void bmi_set_sock_buffers(int socket){
4152        //Set socket buffer sizes:
4153    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Default socket buffers send:%d receive:%d\n",
4154                GET_SENDBUFSIZE(socket), GET_RECVBUFSIZE(socket));
4155    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Setting socket buffer size for send:%d receive:%d \n",
4156                tcp_buffer_size_send, tcp_buffer_size_receive);
4157    if( tcp_buffer_size_receive != 0)
4158         SET_RECVBUFSIZE(socket,tcp_buffer_size_receive);
4159    if( tcp_buffer_size_send != 0)
4160         SET_SENDBUFSIZE(socket,tcp_buffer_size_send);
4161    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Reread socket buffers send:%d receive:%d\n",
4162                GET_SENDBUFSIZE(socket), GET_RECVBUFSIZE(socket));
4163}
4164
4165/*
4166 * Local variables:
4167 *  c-indent-level: 4
4168 *  c-basic-offset: 4
4169 * End:
4170 *
4171 * vim: ts=8 sts=4 sw=4 expandtab
4172 */
Note: See TracBrowser for help on using the browser.