root/trunk/src/io/bmi/bmi_tcp/bmi-tcp.c @ 9187

Revision 9187, 116.8 KB (checked in by ligon, 16 months ago)

Applied same compiler warning changes from 2.8.5 to trunk

src/kernel/linux-2.6/dcache.c
src/io/bmi/bmi_tcp/bmi-tcp.c

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/* TCP/IP implementation of a BMI method */
8
9#include <errno.h>
10#include <string.h>
11#include <unistd.h>
12#include <fcntl.h>
13#include <sys/poll.h>
14#include <netinet/tcp.h>
15#include <assert.h>
16#include <sys/uio.h>
17#include <time.h>
18#include <sys/time.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <arpa/inet.h>
22#include "pint-mem.h"
23
24#include "pvfs2-config.h"
25#ifdef HAVE_NETDB_H
26#include <netdb.h>
27#endif
28
29#include "bmi-method-support.h"
30#include "bmi-method-callback.h"
31#include "bmi-tcp-addressing.h"
32#ifdef __PVFS2_USE_EPOLL__
33#include "socket-collection-epoll.h"
34#else
35#include "socket-collection.h"
36#endif
37#include "op-list.h"
38#include "gossip.h"
39#include "sockio.h"
40#include "bmi-byteswap.h"
41#include "id-generator.h"
42#include "pint-event.h"
43#include "pvfs2-debug.h"
44#ifdef USE_TRUSTED
45#include "server-config.h"
46#include "bmi-tcp-addressing.h"
47#endif
48#include "gen-locks.h"
49#include "pint-hint.h"
50#include "pint-event.h"
51
52static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
53static gen_cond_t interface_cond = GEN_COND_INITIALIZER;
54static int sc_test_busy = 0;
55
56/* function prototypes */
57int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
58                       int method_id,
59                       int init_flags);
60int BMI_tcp_finalize(void);
61int BMI_tcp_set_info(int option,
62                     void *inout_parameter);
63int BMI_tcp_get_info(int option,
64                     void *inout_parameter);
65void *BMI_tcp_memalloc(bmi_size_t size,
66                       enum bmi_op_type send_recv);
67int BMI_tcp_memfree(void *buffer,
68                    bmi_size_t size,
69                    enum bmi_op_type send_recv);
70int BMI_tcp_unexpected_free(void *buffer);
71int BMI_tcp_post_send(bmi_op_id_t * id,
72                      bmi_method_addr_p dest,
73                      const void *buffer,
74                      bmi_size_t size,
75                      enum bmi_buffer_type buffer_type,
76                      bmi_msg_tag_t tag,
77                      void *user_ptr,
78                      bmi_context_id context_id,
79                      PVFS_hint hints);
80int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
81                                bmi_method_addr_p dest,
82                                const void *buffer,
83                                bmi_size_t size,
84                                enum bmi_buffer_type buffer_type,
85                                bmi_msg_tag_t tag,
86                                void *user_ptr,
87                                bmi_context_id context_id,
88                                PVFS_hint hints);
89int BMI_tcp_post_recv(bmi_op_id_t * id,
90                      bmi_method_addr_p src,
91                      void *buffer,
92                      bmi_size_t expected_size,
93                      bmi_size_t * actual_size,
94                      enum bmi_buffer_type buffer_type,
95                      bmi_msg_tag_t tag,
96                      void *user_ptr,
97                      bmi_context_id context_id,
98                      PVFS_hint hints);
99int BMI_tcp_test(bmi_op_id_t id,
100                 int *outcount,
101                 bmi_error_code_t * error_code,
102                 bmi_size_t * actual_size,
103                 void **user_ptr,
104                 int max_idle_time_ms,
105                 bmi_context_id context_id);
106int BMI_tcp_testsome(int incount,
107                     bmi_op_id_t * id_array,
108                     int *outcount,
109                     int *index_array,
110                     bmi_error_code_t * error_code_array,
111                     bmi_size_t * actual_size_array,
112                     void **user_ptr_array,
113                     int max_idle_time_ms,
114                     bmi_context_id context_id);
115int BMI_tcp_testunexpected(int incount,
116                           int *outcount,
117                           struct bmi_method_unexpected_info *info,
118                           int max_idle_time_ms);
119int BMI_tcp_testcontext(int incount,
120                     bmi_op_id_t * out_id_array,
121                     int *outcount,
122                     bmi_error_code_t * error_code_array,
123                     bmi_size_t * actual_size_array,
124                     void **user_ptr_array,
125                     int max_idle_time_ms,
126                     bmi_context_id context_id);
127bmi_method_addr_p BMI_tcp_method_addr_lookup(const char *id_string);
128const char* BMI_tcp_addr_rev_lookup_unexpected(bmi_method_addr_p map);
129int BMI_tcp_query_addr_range(bmi_method_addr_p, const char *, int);
130int BMI_tcp_post_send_list(bmi_op_id_t * id,
131                           bmi_method_addr_p dest,
132                           const void *const *buffer_list,
133                           const bmi_size_t *size_list,
134                           int list_count,
135                           bmi_size_t total_size,
136                           enum bmi_buffer_type buffer_type,
137                           bmi_msg_tag_t tag,
138                           void *user_ptr,
139                           bmi_context_id context_id,
140                           PVFS_hint hints);
141int BMI_tcp_post_recv_list(bmi_op_id_t * id,
142                           bmi_method_addr_p src,
143                           void *const *buffer_list,
144                           const bmi_size_t *size_list,
145                           int list_count,
146                           bmi_size_t total_expected_size,
147                           bmi_size_t * total_actual_size,
148                           enum bmi_buffer_type buffer_type,
149                           bmi_msg_tag_t tag,
150                           void *user_ptr,
151                           bmi_context_id context_id,
152                           PVFS_hint hints);
153int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
154                                     bmi_method_addr_p dest,
155                                     const void *const *buffer_list,
156                                     const bmi_size_t *size_list,
157                                     int list_count,
158                                     bmi_size_t total_size,
159                                     enum bmi_buffer_type buffer_type,
160                                     bmi_msg_tag_t tag,
161                                     void *user_ptr,
162                                     bmi_context_id context_id,
163                                     PVFS_hint hints);
164int BMI_tcp_open_context(bmi_context_id context_id);
165void BMI_tcp_close_context(bmi_context_id context_id);
166int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id);
167
168char BMI_tcp_method_name[] = "bmi_tcp";
169
170/* size of encoded message header */
171#define TCP_ENC_HDR_SIZE 24
172
173/* structure internal to tcp for use as a message header */
174struct tcp_msg_header
175{
176    uint32_t magic_nr;          /* magic number */
177    uint32_t mode;              /* eager, rendezvous, etc. */
178    bmi_msg_tag_t tag;          /* user specified message tag */
179    bmi_size_t size;            /* length of trailing message */
180    char enc_hdr[TCP_ENC_HDR_SIZE];  /* encoded version of header info */
181};
182
183#define BMI_TCP_ENC_HDR(hdr)                                            \
184    do {                                                                \
185        uint32_t *tmp32;                                                \
186        tmp32 = (uint32_t *)&(hdr).enc_hdr[0];                          \
187        *(tmp32) = htobmi32((hdr).magic_nr);                            \
188        *((uint32_t*)&((hdr).enc_hdr[4])) = htobmi32((hdr).mode);       \
189        *((uint64_t*)&((hdr).enc_hdr[8])) = htobmi64((hdr).tag);        \
190        *((uint64_t*)&((hdr).enc_hdr[16])) = htobmi64((hdr).size);      \
191    } while(0)                                             
192
193#define BMI_TCP_DEC_HDR(hdr)                                            \
194    do {                                                                \
195        uint32_t tmp32;                                                 \
196        memcpy(&tmp32,&(hdr).enc_hdr[0],sizeof(uint32_t));              \
197        (hdr).magic_nr = bmitoh32(tmp32);                               \
198        (hdr).mode = bmitoh32(*((uint32_t*)&((hdr).enc_hdr[4])));       \
199        (hdr).tag = bmitoh64(*((uint64_t*)&((hdr).enc_hdr[8])));        \
200        (hdr).size = bmitoh64(*((uint64_t*)&((hdr).enc_hdr[16])));      \
201    } while(0)                                             
202
203
204/* enumerate states that we care about */
205enum bmi_tcp_state
206{
207    BMI_TCP_INPROGRESS,
208    BMI_TCP_BUFFERING,
209    BMI_TCP_COMPLETE
210};
211
212/* tcp private portion of operation structure */
213struct tcp_op
214{
215    struct tcp_msg_header env;  /* envelope for this message */
216    enum bmi_tcp_state tcp_op_state;
217    /* these two fields are used as place holders for the buffer
218     * list and size list when we really don't have lists (regular
219     * BMI_send or BMI_recv operations); it allows us to use
220     * generic code to handle both cases
221     */
222    void *buffer_list_stub;
223    bmi_size_t size_list_stub;
224};
225
226/* static io vector for use with readv and writev; we can only use
227 * this because BMI serializes module calls
228 */
229#define BMI_TCP_IOV_COUNT 10
230static struct iovec stat_io_vector[BMI_TCP_IOV_COUNT+1];
231
232/* internal utility functions */
233static int tcp_server_init(void);
234static void dealloc_tcp_method_addr(bmi_method_addr_p map);
235static int tcp_sock_init(bmi_method_addr_p my_method_addr);
236static int enqueue_operation(op_list_p target_list,
237                             enum bmi_op_type send_recv,
238                             bmi_method_addr_p map,
239                             void *const *buffer_list,
240                             const bmi_size_t *size_list,
241                             int list_count,
242                             bmi_size_t amt_complete,
243                             bmi_size_t env_amt_complete,
244                             bmi_op_id_t * id,
245                             int tcp_op_state,
246                             struct tcp_msg_header header,
247                             void *user_ptr,
248                             bmi_size_t actual_size,
249                             bmi_size_t expected_size,
250                             bmi_context_id context_id,
251                             int32_t event_id);
252static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code);
253static int tcp_shutdown_addr(bmi_method_addr_p map);
254static int tcp_do_work(int max_idle_time);
255static int tcp_do_work_error(bmi_method_addr_p map);
256static int tcp_do_work_recv(bmi_method_addr_p map, int* stall_flag);
257static int tcp_do_work_send(bmi_method_addr_p map, int* stall_flag);
258static int work_on_recv_op(method_op_p my_method_op,
259                           int *stall_flag);
260static int work_on_send_op(method_op_p my_method_op,
261                           int *blocked_flag, int* stall_flag);
262static int tcp_accept_init(int *socket, char** peer);
263static method_op_p alloc_tcp_method_op(void);
264static void dealloc_tcp_method_op(method_op_p old_op);
265static int handle_new_connection(bmi_method_addr_p map);
266static int tcp_post_send_generic(bmi_op_id_t * id,
267                                 bmi_method_addr_p dest,
268                                 const void *const *buffer_list,
269                                 const bmi_size_t *size_list,
270                                 int list_count,
271                                 enum bmi_buffer_type buffer_type,
272                                 struct tcp_msg_header my_header,
273                                 void *user_ptr,
274                                 bmi_context_id context_id,
275                                 PVFS_hint hints);
276static int tcp_post_recv_generic(bmi_op_id_t * id,
277                                 bmi_method_addr_p src,
278                                 void *const *buffer_list,
279                                 const bmi_size_t *size_list,
280                                 int list_count,
281                                 bmi_size_t expected_size,
282                                 bmi_size_t * actual_size,
283                                 enum bmi_buffer_type buffer_type,
284                                 bmi_msg_tag_t tag,
285                                 void *user_ptr,
286                                 bmi_context_id context_id,
287                                 PVFS_hint hints);
288static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
289    size_list, int list_count, bmi_size_t total_size, int* list_index,
290    bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
291    char* enc_hdr, bmi_size_t* env_amt_complete);
292
293#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
294static int tcp_enable_trusted(struct tcp_addr *tcp_addr_data);
295#endif
296#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
297static int tcp_allow_trusted(struct sockaddr_in *peer_sockaddr);
298#endif
299
300static void bmi_set_sock_buffers(int socket);
301
302/* exported method interface */
303const struct bmi_method_ops bmi_tcp_ops = {
304    .method_name = BMI_tcp_method_name,
305    .initialize = BMI_tcp_initialize,
306    .finalize = BMI_tcp_finalize,
307    .set_info = BMI_tcp_set_info,
308    .get_info = BMI_tcp_get_info,
309    .memalloc = BMI_tcp_memalloc,
310    .memfree  = BMI_tcp_memfree,
311    .unexpected_free = BMI_tcp_unexpected_free,
312    .post_send = BMI_tcp_post_send,
313    .post_sendunexpected = BMI_tcp_post_sendunexpected,
314    .post_recv = BMI_tcp_post_recv,
315    .test = BMI_tcp_test,
316    .testsome = BMI_tcp_testsome,
317    .testcontext = BMI_tcp_testcontext,
318    .testunexpected = BMI_tcp_testunexpected,
319    .method_addr_lookup = BMI_tcp_method_addr_lookup,
320    .post_send_list = BMI_tcp_post_send_list,
321    .post_recv_list = BMI_tcp_post_recv_list,
322    .post_sendunexpected_list = BMI_tcp_post_sendunexpected_list,
323    .open_context = BMI_tcp_open_context,
324    .close_context = BMI_tcp_close_context,
325    .cancel = BMI_tcp_cancel,
326    .rev_lookup_unexpected = BMI_tcp_addr_rev_lookup_unexpected,
327    .query_addr_range = BMI_tcp_query_addr_range,
328};
329
330/* module parameters */
331static struct
332{
333    int method_flags;
334    int method_id;
335    bmi_method_addr_p listen_addr;
336} tcp_method_params;
337
338#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
339static struct tcp_allowed_connection_s *gtcp_allowed_connection = NULL;
340#endif
341
342static int check_unexpected = 1;
343
344/* op_list_array indices */
345enum
346{
347    NUM_INDICES = 5,
348    IND_SEND = 0,
349    IND_RECV = 1,
350    IND_RECV_INFLIGHT = 2,
351    IND_RECV_EAGER_DONE_BUFFERING = 3,
352    IND_COMPLETE_RECV_UNEXP = 4,        /* MAKE THIS COMES LAST */
353};
354
355/* internal operation lists */
356static op_list_p op_list_array[6] = { NULL, NULL, NULL, NULL,
357    NULL, NULL
358};
359
360/* internal completion queues */
361static op_list_p completion_array[BMI_MAX_CONTEXTS] = { NULL };
362
363/* internal socket collection */
364static socket_collection_p tcp_socket_collection_p = NULL;
365
366/* tunable parameters */
367enum
368{
369    /* amount of pending connections we'll allow */
370    TCP_BACKLOG = 256,
371    /* amount of work to be done during a test.  This roughly
372     * translates into the number of sockets that we will perform
373     * nonblocking operations on during one function call.
374     */
375    TCP_WORK_METRIC = 128
376};
377
378/* TCP message modes */
379enum
380{
381    TCP_MODE_IMMED = 1,         /* not used for TCP/IP */
382    TCP_MODE_UNEXP = 2,
383    TCP_MODE_EAGER = 4,
384    TCP_MODE_REND = 8
385};
386
387/* Allowable sizes for each mode */
388enum
389{
390    TCP_MODE_EAGER_LIMIT = 16384,       /* 16K */
391    TCP_MODE_REND_LIMIT = 16777216      /* 16M */
392};
393
394/* toggles cancel mode; for bmi_tcp this will result in socket being closed
395 * in all cancellation cases
396 */
397static int forceful_cancel_mode = 0;
398
399/*
400  Socket buffer sizes, currently these default values will be used
401  for the clients... (TODO)
402 */
403static int tcp_buffer_size_receive = 0;
404static int tcp_buffer_size_send = 0;
405
406static PINT_event_type bmi_tcp_send_event_id;
407static PINT_event_type bmi_tcp_recv_event_id;
408
409static PINT_event_group bmi_tcp_event_group;
410static pid_t bmi_tcp_pid;
411
412/*************************************************************************
413 * Visible Interface
414 */
415
416/* BMI_tcp_initialize()
417 *
418 * Initializes the tcp method.  Must be called before any other tcp
419 * method functions.
420 *
421 * returns 0 on success, -errno on failure
422 */
423int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
424                       int method_id,
425                       int init_flags)
426{
427
428    int ret = -1;
429    int tmp_errno = bmi_tcp_errno_to_pvfs(-ENOSYS);
430    struct tcp_addr *tcp_addr_data = NULL;
431    int i = 0;
432
433    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Initializing TCP/IP module.\n");
434
435    /* check args */
436    if ((init_flags & BMI_INIT_SERVER) && !listen_addr)
437    {
438        gossip_lerr("Error: bad parameters given to TCP/IP module.\n");
439        return (bmi_tcp_errno_to_pvfs(-EINVAL));
440    }
441
442    gen_mutex_lock(&interface_mutex);
443
444    /* zero out our parameter structure and fill it in */
445    memset(&tcp_method_params, 0, sizeof(tcp_method_params));
446    tcp_method_params.method_id = method_id;
447    tcp_method_params.method_flags = init_flags;
448
449    if (init_flags & BMI_INIT_SERVER)
450    {
451        /* hang on to our local listening address if needed */
452        tcp_method_params.listen_addr = listen_addr;
453        /* and initialize server functions */
454        ret = tcp_server_init();
455        if (ret < 0)
456        {
457            tmp_errno = bmi_tcp_errno_to_pvfs(ret);
458            gossip_err("Error: tcp_server_init() failure.\n");
459            goto initialize_failure;
460        }
461    }
462
463    /* set up the operation lists */
464    for (i = 0; i < NUM_INDICES; i++)
465    {
466        op_list_array[i] = op_list_new();
467        if (!op_list_array[i])
468        {
469            tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
470            goto initialize_failure;
471        }
472    }
473
474    /* set up the socket collection */
475    if (tcp_method_params.method_flags & BMI_INIT_SERVER)
476    {
477        tcp_addr_data = tcp_method_params.listen_addr->method_data;
478        tcp_socket_collection_p = BMI_socket_collection_init(tcp_addr_data->socket);
479    }
480    else
481    {
482        tcp_socket_collection_p = BMI_socket_collection_init(-1);
483    }
484
485    if (!tcp_socket_collection_p)
486    {
487        tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
488        goto initialize_failure;
489    }
490
491    bmi_tcp_pid = getpid();
492    PINT_event_define_group("bmi_tcp", &bmi_tcp_event_group);
493
494    /* Define the send event:
495     *   START: (client_id, request_id, rank, handle, op_id, send_size)
496     *   STOP: (size_sent)
497     */
498    PINT_event_define_event(
499        &bmi_tcp_event_group,
500#ifdef __PVFS2_SERVER__
501        "bmi_server_send",
502#else
503        "bmi_client_send",
504#endif
505        "%d%d%d%llu%d%d",
506        "%d", &bmi_tcp_send_event_id);
507
508    /* Define the recv event:
509     *   START: (client_id, request_id, rank, handle, op_id, recv_size)
510     *   STOP: (size_received)
511     */
512    PINT_event_define_event(
513        &bmi_tcp_event_group,
514#ifdef __PVFS2_SERVER__
515        "bmi_server_recv",
516#else
517        "bmi_client_recv",
518#endif
519        "%d%d%d%llu%d%d",
520        "%d", &bmi_tcp_recv_event_id);
521
522    gen_mutex_unlock(&interface_mutex);
523    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
524                  "TCP/IP module successfully initialized.\n");
525    return (0);
526
527  initialize_failure:
528
529    /* cleanup data structures and bail out */
530    for (i = 0; i < NUM_INDICES; i++)
531    {
532        if (op_list_array[i])
533        {
534            op_list_cleanup(op_list_array[i]);
535        }
536    }
537    if (tcp_socket_collection_p)
538    {
539        BMI_socket_collection_finalize(tcp_socket_collection_p);
540    }
541    gen_mutex_unlock(&interface_mutex);
542    return (tmp_errno);
543}
544
545
546/* BMI_tcp_finalize()
547 *
548 * Shuts down the tcp method.
549 *
550 * returns 0 on success, -errno on failure
551 */
552int BMI_tcp_finalize(void)
553{
554    int i = 0;
555
556    gen_mutex_lock(&interface_mutex);
557
558    /* shut down our listen addr, if we have one */
559    if ((tcp_method_params.method_flags & BMI_INIT_SERVER)
560        && tcp_method_params.listen_addr)
561    {
562        dealloc_tcp_method_addr(tcp_method_params.listen_addr);
563    }
564
565    /* note that this forcefully shuts down operations */
566    for (i = 0; i < NUM_INDICES; i++)
567    {
568        if (op_list_array[i])
569        {
570            op_list_cleanup(op_list_array[i]);
571            op_list_array[i] = NULL;
572        }
573    }
574
575    /* get rid of socket collection */
576    if (tcp_socket_collection_p)
577    {
578        BMI_socket_collection_finalize(tcp_socket_collection_p);
579        tcp_socket_collection_p = NULL;
580    }
581
582    /* NOTE: we are trusting the calling BMI layer to deallocate
583     * all of the method addresses (this will close any open sockets)
584     */
585    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "TCP/IP module finalized.\n");
586    gen_mutex_unlock(&interface_mutex);
587    return (0);
588}
589
590
591/*
592 * BMI_tcp_method_addr_lookup()
593 *
594 * resolves the string representation of an address into a method
595 * address structure. 
596 *
597 * returns a pointer to method_addr on success, NULL on failure
598 */
599bmi_method_addr_p BMI_tcp_method_addr_lookup(const char *id_string)
600{
601    char *tcp_string = NULL;
602    char *delim = NULL;
603    char *hostname = NULL;
604    bmi_method_addr_p new_addr = NULL;
605    struct tcp_addr *tcp_addr_data = NULL;
606    int ret = -1;
607
608    tcp_string = string_key("tcp", id_string);
609    if (!tcp_string)
610    {
611        /* the string doesn't even have our info */
612        return (NULL);
613    }
614
615    /* start breaking up the method information */
616    /* for normal tcp, it is simply hostname:port */
617    if ((delim = index(tcp_string, ':')) == NULL)
618    {
619        gossip_lerr("Error: malformed tcp address.\n");
620        free(tcp_string);
621        return (NULL);
622    }
623
624    /* looks ok, so let's build the method addr structure */
625    new_addr = alloc_tcp_method_addr();
626    if (!new_addr)
627    {
628        free(tcp_string);
629        return (NULL);
630    }
631    tcp_addr_data = new_addr->method_data;
632
633    ret = sscanf((delim + 1), "%d", &(tcp_addr_data->port));
634    if (ret != 1)
635    {
636        gossip_lerr("Error: malformed tcp address.\n");
637        dealloc_tcp_method_addr(new_addr);
638        free(tcp_string);
639        return (NULL);
640    }
641
642    hostname = (char *) malloc((delim - tcp_string + 1));
643    if (!hostname)
644    {
645        dealloc_tcp_method_addr(new_addr);
646        free(tcp_string);
647        return (NULL);
648    }
649    strncpy(hostname, tcp_string, (delim - tcp_string));
650    hostname[delim - tcp_string] = '\0';
651
652    tcp_addr_data->hostname = hostname;
653
654    free(tcp_string);
655    return (new_addr);
656}
657
658
659/* BMI_tcp_memalloc()
660 *
661 * Allocates memory that can be used in native mode by tcp.
662 *
663 * returns 0 on success, -errno on failure
664 */
665void *BMI_tcp_memalloc(bmi_size_t size,
666                       enum bmi_op_type send_recv)
667{
668    /* we really don't care what flags the caller uses, TCP/IP has no
669     * preferences about how the memory should be configured.
670     */
671
672/*    return (calloc(1,(size_t) size)); */
673    return PINT_mem_aligned_alloc(size, 4096);
674}
675
676
677/* BMI_tcp_memfree()
678 *
679 * Frees memory that was allocated with BMI_tcp_memalloc()
680 *
681 * returns 0 on success, -errno on failure
682 */
683int BMI_tcp_memfree(void *buffer,
684                    bmi_size_t size,
685                    enum bmi_op_type send_recv)
686{
687    PINT_mem_aligned_free(buffer);
688    return (0);
689}
690
691/* BMI_tcp_unexpected_free()
692 *
693 * Frees memory that was returned from BMI_tcp_test_unexpected()
694 *
695 * returns 0 on success, -errno on failure
696 */
697int BMI_tcp_unexpected_free(void *buffer)
698{
699    if (buffer)
700    {
701        free(buffer);
702    }
703    return (0);
704}
705
706#ifdef USE_TRUSTED
707
708static struct tcp_allowed_connection_s *
709alloc_trusted_connection_info(int network_count)
710{
711    struct tcp_allowed_connection_s *tcp_allowed_connection_info = NULL;
712
713    tcp_allowed_connection_info = (struct tcp_allowed_connection_s *)
714            calloc(1, sizeof(struct tcp_allowed_connection_s));
715    if (tcp_allowed_connection_info)
716    {
717        tcp_allowed_connection_info->network =
718            (struct in_addr *) calloc(network_count, sizeof(struct in_addr));
719        if (tcp_allowed_connection_info->network == NULL)
720        {
721            free(tcp_allowed_connection_info);
722            tcp_allowed_connection_info = NULL;
723        }
724        else
725        {
726            tcp_allowed_connection_info->netmask =
727                (struct in_addr *) calloc(network_count, sizeof(struct in_addr));
728            if (tcp_allowed_connection_info->netmask == NULL)
729            {
730                free(tcp_allowed_connection_info->network);
731                free(tcp_allowed_connection_info);
732                tcp_allowed_connection_info = NULL;
733            }
734            else {
735                tcp_allowed_connection_info->network_count = network_count;
736            }
737        }
738    }
739    return tcp_allowed_connection_info;
740}
741
742static void
743dealloc_trusted_connection_info(void* ptcp_allowed_connection_info)
744{
745    struct tcp_allowed_connection_s *tcp_allowed_connection_info =
746        (struct tcp_allowed_connection_s *) ptcp_allowed_connection_info;
747    if (tcp_allowed_connection_info)
748    {
749        free(tcp_allowed_connection_info->network);
750        tcp_allowed_connection_info->network = NULL;
751        free(tcp_allowed_connection_info->netmask);
752        tcp_allowed_connection_info->netmask = NULL;
753        free(tcp_allowed_connection_info);
754    }
755    return;
756}
757
758#endif
759
760/*
761 * This function will convert a mask_bits value to an in_addr
762 * representation. i.e for example if
763 * mask_bits was 24 then it would be 255.255.255.0
764 * if mask_bits was 22 then it would be 255.255.252.0
765 * etc
766 */
767static void convert_mask(int mask_bits, struct in_addr *mask)
768{
769   uint32_t addr = -1;
770   addr = addr & ~~(-1 << (mask_bits ? (32 - mask_bits) : 32));
771   mask->s_addr = htonl(addr);
772   return;
773}
774
775/* BMI_tcp_set_info()
776 *
777 * Pass in optional parameters.
778 *
779 * returns 0 on success, -errno on failure
780 */
781int BMI_tcp_set_info(int option,
782                     void *inout_parameter)
783{
784    int ret = -1;
785    bmi_method_addr_p tmp_addr = NULL;
786
787    gen_mutex_lock(&interface_mutex);
788
789    switch (option)
790    {
791    case BMI_TCP_BUFFER_SEND_SIZE:
792       tcp_buffer_size_send = *((int *)inout_parameter);
793       ret = 0;
794#ifdef __PVFS2_SERVER__
795       /* Set the default socket buffer sizes for the server socket */
796       bmi_set_sock_buffers(
797           ((struct tcp_addr *)
798            tcp_method_params.listen_addr->method_data)->socket);
799#endif
800       break;
801    case BMI_TCP_BUFFER_RECEIVE_SIZE:
802       tcp_buffer_size_receive = *((int *)inout_parameter);
803       ret = 0;
804#ifdef __PVFS2_SERVER__
805       /* Set the default socket buffer sizes for the server socket */
806       bmi_set_sock_buffers(
807           ((struct tcp_addr *)
808            tcp_method_params.listen_addr->method_data)->socket);
809#endif
810       break;
811    case BMI_TCP_CLOSE_SOCKET:
812        /* this should no longer make it to the bmi_tcp method; see bmi.c */
813        ret = 0;
814        break;
815    case BMI_FORCEFUL_CANCEL_MODE:
816        forceful_cancel_mode = 1;
817        ret = 0;
818        break;
819    case BMI_DROP_ADDR:
820        if (inout_parameter == NULL)
821        {
822            ret = bmi_tcp_errno_to_pvfs(-EINVAL);
823        }
824        else
825        {
826            tmp_addr = (bmi_method_addr_p) inout_parameter;
827            /* take it out of the socket collection */
828            tcp_forget_addr(tmp_addr, 1, 0);
829            ret = 0;
830        }
831        break;
832#ifdef USE_TRUSTED
833    case BMI_TRUSTED_CONNECTION:
834    {
835        struct tcp_allowed_connection_s *tcp_allowed_connection = NULL;
836        if (inout_parameter == NULL)
837        {
838            ret = bmi_tcp_errno_to_pvfs(-EINVAL);
839            break;
840        }
841        else
842        {
843            int    bmi_networks_count = 0;
844            char **bmi_networks = NULL;
845            int   *bmi_netmasks = NULL;
846            struct server_configuration_s *svc_config = NULL;
847
848            svc_config = (struct server_configuration_s *) inout_parameter;
849            tcp_allowed_connection = alloc_trusted_connection_info(svc_config->allowed_networks_count);
850            if (tcp_allowed_connection == NULL)
851            {
852                ret = bmi_tcp_errno_to_pvfs(-ENOMEM);
853                break;
854            }
855#ifdef      __PVFS2_SERVER__
856            gtcp_allowed_connection = tcp_allowed_connection;
857#endif
858            /* Stash this in the server_configuration_s structure. freed later on */
859            svc_config->security = tcp_allowed_connection;
860            svc_config->security_dtor = &dealloc_trusted_connection_info;
861            ret = 0;
862            /* Fill up the list of allowed ports */
863            PINT_config_get_allowed_ports(svc_config,
864                    &tcp_allowed_connection->port_enforce,
865                    tcp_allowed_connection->ports);
866
867            /* if it was enabled, make sure that we know how to deal with it */
868            if (tcp_allowed_connection->port_enforce == 1)
869            {
870                /* illegal ports */
871                if (tcp_allowed_connection->ports[0] > 65535
872                        || tcp_allowed_connection->ports[1] > 65535
873                        || tcp_allowed_connection->ports[1] < tcp_allowed_connection->ports[0])
874                {
875                    gossip_lerr("Error: illegal trusted port values\n");
876                    ret = bmi_tcp_errno_to_pvfs(-EINVAL);
877                    /* don't enforce anything! */
878                    tcp_allowed_connection->port_enforce = 0;
879                }
880            }
881            ret = 0;
882            /* Retrieve the list of BMI network addresses and masks  */
883            PINT_config_get_allowed_networks(svc_config,
884                    &tcp_allowed_connection->network_enforce,
885                    &bmi_networks_count,
886                    &bmi_networks,
887                    &bmi_netmasks);
888
889            /* if it was enabled, make sure that we know how to deal with it */
890            if (tcp_allowed_connection->network_enforce == 1)
891            {
892                int i;
893
894                for (i = 0; i < bmi_networks_count; i++)
895                {
896                    char *tcp_string = NULL;
897                    /* Convert the network string into an in_addr_t structure */
898                    tcp_string = string_key("tcp", bmi_networks[i]);
899                    if (!tcp_string)
900                    {
901                        /* the string doesn't even have our info */
902                        gossip_lerr("Error: malformed tcp network address\n");
903                        ret = bmi_tcp_errno_to_pvfs(-EINVAL);
904                    }
905                    else {
906                        /* convert this into an in_addr_t */
907                        inet_aton(tcp_string, &tcp_allowed_connection->network[i]);
908                        free(tcp_string);
909                    }
910                    convert_mask(bmi_netmasks[i], &tcp_allowed_connection->netmask[i]);
911                }
912                /* don't enforce anything if there were any errors */
913                if (ret != 0)
914                {
915                    tcp_allowed_connection->network_enforce = 0;
916                }
917            }
918        }
919        break;
920    }
921#endif
922    case BMI_TCP_CHECK_UNEXPECTED:
923    {
924        check_unexpected = *(int *)inout_parameter;
925        ret = 0;
926        break;
927    }
928
929    default:
930        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
931                      "TCP hint %d not implemented.\n", option);
932        ret = 0;
933        break;
934    }
935
936    gen_mutex_unlock(&interface_mutex);
937    return (ret);
938}
939
940/* BMI_tcp_get_info()
941 *
942 * Query for optional parameters.
943 *
944 * returns 0 on success, -errno on failure
945 */
946int BMI_tcp_get_info(int option,
947                     void *inout_parameter)
948{
949    struct method_drop_addr_query* query;
950    struct tcp_addr* tcp_addr_data;
951    int ret = 0;
952
953    gen_mutex_lock(&interface_mutex);
954
955    switch (option)
956    {
957    case BMI_CHECK_MAXSIZE:
958        *((int *) inout_parameter) = TCP_MODE_REND_LIMIT;
959        ret = 0;
960        break;
961    case BMI_DROP_ADDR_QUERY:
962        query = (struct method_drop_addr_query*)inout_parameter;
963        tcp_addr_data=query->addr->method_data;
964        /* only suggest that we discard the address if we have experienced
965         * an error and there is no way to reconnect
966         */
967        if(tcp_addr_data->addr_error != 0 &&
968           tcp_addr_data->dont_reconnect == 1)
969        {
970            query->response = 1;
971        }
972        else
973        {
974            query->response = 0;
975        }
976        ret = 0;
977        break;
978    case BMI_GET_UNEXP_SIZE:
979        *((int *) inout_parameter) = TCP_MODE_EAGER_LIMIT;
980        ret = 0;
981        break;
982
983    default:
984        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
985                      "TCP hint %d not implemented.\n", option);
986        ret = -ENOSYS;
987        break;
988    }
989
990    gen_mutex_unlock(&interface_mutex);
991    return (ret < 0) ? bmi_tcp_errno_to_pvfs(ret) : ret;
992}
993
994
995/* BMI_tcp_post_send()
996 *
997 * Submits send operations.
998 *
999 * returns 0 on success that requires later poll, returns 1 on instant
1000 * completion, -errno on failure
1001 */
1002int BMI_tcp_post_send(bmi_op_id_t * id,
1003                      bmi_method_addr_p dest,
1004                      const void *buffer,
1005                      bmi_size_t size,
1006                      enum bmi_buffer_type buffer_type,
1007                      bmi_msg_tag_t tag,
1008                      void *user_ptr,
1009                      bmi_context_id context_id,
1010                      PVFS_hint hints)
1011{
1012    struct tcp_msg_header my_header;
1013    int ret = -1;
1014
1015    /* clear the id field for safety */
1016    *id = 0;
1017
1018    /* fill in the TCP-specific message header */
1019    if (size > TCP_MODE_REND_LIMIT)
1020    {
1021        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1022    }
1023
1024    if (size <= TCP_MODE_EAGER_LIMIT)
1025    {
1026        my_header.mode = TCP_MODE_EAGER;
1027    }
1028    else
1029    {
1030        my_header.mode = TCP_MODE_REND;
1031    }
1032    my_header.tag = tag;
1033    my_header.size = size;
1034    my_header.magic_nr = BMI_MAGIC_NR;
1035
1036    gen_mutex_lock(&interface_mutex);
1037
1038    ret = tcp_post_send_generic(id, dest, &buffer,
1039                                &size, 1, buffer_type, my_header,
1040                                user_ptr, context_id, hints);
1041
1042    gen_mutex_unlock(&interface_mutex);
1043    return(ret);
1044}
1045
1046
1047/* BMI_tcp_post_sendunexpected()
1048 *
1049 * Submits unexpected send operations.
1050 *
1051 * returns 0 on success that requires later poll, returns 1 on instant
1052 * completion, -errno on failure
1053 */
1054int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
1055                                bmi_method_addr_p dest,
1056                                const void *buffer,
1057                                bmi_size_t size,
1058                                enum bmi_buffer_type buffer_type,
1059                                bmi_msg_tag_t tag,
1060                                void *user_ptr,
1061                                bmi_context_id context_id,
1062                                PVFS_hint hints)
1063{
1064    struct tcp_msg_header my_header;
1065    int ret = -1;
1066
1067    /* clear the id field for safety */
1068    *id = 0;
1069
1070    if (size > TCP_MODE_EAGER_LIMIT)
1071    {
1072        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1073    }
1074
1075    my_header.mode = TCP_MODE_UNEXP;
1076    my_header.tag = tag;
1077    my_header.size = size;
1078    my_header.magic_nr = BMI_MAGIC_NR;
1079
1080    gen_mutex_lock(&interface_mutex);
1081
1082    ret = tcp_post_send_generic(id, dest, &buffer,
1083                                &size, 1, buffer_type, my_header,
1084                                user_ptr, context_id, hints);
1085    gen_mutex_unlock(&interface_mutex);
1086    return(ret);
1087}
1088
1089
1090
1091/* BMI_tcp_post_recv()
1092 *
1093 * Submits recv operations.
1094 *
1095 * returns 0 on success that requires later poll, returns 1 on instant
1096 * completion, -errno on failure
1097 */
1098int BMI_tcp_post_recv(bmi_op_id_t * id,
1099                      bmi_method_addr_p src,
1100                      void *buffer,
1101                      bmi_size_t expected_size,
1102                      bmi_size_t * actual_size,
1103                      enum bmi_buffer_type buffer_type,
1104                      bmi_msg_tag_t tag,
1105                      void *user_ptr,
1106                      bmi_context_id context_id,
1107                      PVFS_hint hints)
1108{
1109    int ret = -1;
1110
1111    /* A few things could happen here:
1112     * a) rendez. recv with sender not ready yet
1113     * b) rendez. recv with sender waiting
1114     * c) eager recv, data not available yet
1115     * d) eager recv, some/all data already here
1116     * e) rendez. recv with sender in eager mode
1117     *
1118     * b or d could lead to completion without polling.
1119     * we don't look for unexpected messages here.
1120     */
1121
1122    if (expected_size > TCP_MODE_REND_LIMIT)
1123    {
1124        return (bmi_tcp_errno_to_pvfs(-EINVAL));
1125    }
1126    gen_mutex_lock(&interface_mutex);
1127
1128    ret = tcp_post_recv_generic(id, src, &buffer, &expected_size,
1129                                1, expected_size, actual_size,
1130                                buffer_type, tag,
1131                                user_ptr, context_id, hints);
1132
1133    gen_mutex_unlock(&interface_mutex);
1134    return (ret);
1135}
1136
1137
1138/* BMI_tcp_test()
1139 *
1140 * Checks to see if a particular message has completed.
1141 *
1142 * returns 0 on success, -errno on failure
1143 */
1144int BMI_tcp_test(bmi_op_id_t id,
1145                 int *outcount,
1146                 bmi_error_code_t * error_code,
1147                 bmi_size_t * actual_size,
1148                 void **user_ptr,
1149                 int max_idle_time,
1150                 bmi_context_id context_id)
1151{
1152    int ret = -1;
1153    method_op_p query_op = (method_op_p)id_gen_fast_lookup(id);
1154
1155    assert(query_op != NULL);
1156
1157    gen_mutex_lock(&interface_mutex);
1158
1159    /* do some ``real work'' here */
1160    ret = tcp_do_work(max_idle_time);
1161    if (ret < 0)
1162    {
1163        gen_mutex_unlock(&interface_mutex);
1164        return (ret);
1165    }
1166
1167    if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1168        BMI_TCP_COMPLETE)
1169    {
1170        assert(query_op->context_id == context_id);
1171        op_list_remove(query_op);
1172        if (user_ptr != NULL)
1173        {
1174            (*user_ptr) = query_op->user_ptr;
1175        }
1176        (*error_code) = query_op->error_code;
1177        (*actual_size) = query_op->actual_size;
1178        PINT_EVENT_END(
1179            (query_op->send_recv == BMI_SEND ?
1180             bmi_tcp_send_event_id : bmi_tcp_recv_event_id), bmi_tcp_pid, NULL,
1181             query_op->event_id, id, *actual_size);
1182
1183        dealloc_tcp_method_op(query_op);
1184        (*outcount)++;
1185    }
1186
1187    gen_mutex_unlock(&interface_mutex);
1188    return (0);
1189}
1190
1191/* BMI_tcp_testsome()
1192 *
1193 * Checks to see if any messages from the specified list have completed.
1194 *
1195 * returns 0 on success, -errno on failure
1196 */
1197int BMI_tcp_testsome(int incount,
1198                     bmi_op_id_t * id_array,
1199                     int *outcount,
1200                     int *index_array,
1201                     bmi_error_code_t * error_code_array,
1202                     bmi_size_t * actual_size_array,
1203                     void **user_ptr_array,
1204                     int max_idle_time,
1205                     bmi_context_id context_id)
1206{
1207    int ret = -1;
1208    method_op_p query_op = NULL;
1209    int i;
1210
1211    gen_mutex_lock(&interface_mutex);
1212
1213    /* do some ``real work'' here */
1214    ret = tcp_do_work(max_idle_time);
1215    if (ret < 0)
1216    {
1217        gen_mutex_unlock(&interface_mutex);
1218        return (ret);
1219    }
1220
1221    for(i=0; i<incount; i++)
1222    {
1223        if(id_array[i])
1224        {
1225            /* NOTE: this depends on the user passing in valid id's;
1226             * otherwise we segfault. 
1227             */
1228            query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
1229            if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1230               BMI_TCP_COMPLETE)
1231            {
1232                assert(query_op->context_id == context_id);
1233                /* this one's done; pop it out */
1234                op_list_remove(query_op);
1235                error_code_array[*outcount] = query_op->error_code;
1236                actual_size_array[*outcount] = query_op->actual_size;
1237                index_array[*outcount] = i;
1238                if (user_ptr_array != NULL)
1239                {
1240                    user_ptr_array[*outcount] = query_op->user_ptr;
1241                }
1242                PINT_EVENT_END(
1243                    (query_op->send_recv == BMI_SEND ?
1244                     bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
1245                    bmi_tcp_pid, NULL,
1246                    query_op->event_id, actual_size_array[*outcount]);
1247                dealloc_tcp_method_op(query_op);
1248                (*outcount)++;
1249            }
1250        }
1251    }
1252
1253    gen_mutex_unlock(&interface_mutex);
1254    return(0);
1255}
1256
1257
1258/* BMI_tcp_testunexpected()
1259 *
1260 * Checks to see if any unexpected messages have completed.
1261 *
1262 * returns 0 on success, -errno on failure
1263 */
1264int BMI_tcp_testunexpected(int incount,
1265                           int *outcount,
1266                           struct bmi_method_unexpected_info *info,
1267                           int max_idle_time)
1268{
1269    int ret = -1;
1270    method_op_p query_op = NULL;
1271
1272    gen_mutex_lock(&interface_mutex);
1273
1274    if(op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
1275    {
1276        /* do some ``real work'' here */
1277        ret = tcp_do_work(max_idle_time);
1278        if (ret < 0)
1279        {
1280            gen_mutex_unlock(&interface_mutex);
1281            return (ret);
1282        }
1283    }
1284
1285    *outcount = 0;
1286
1287    /* go through the completed/unexpected list as long as we are finding
1288     * stuff and we have room in the info array for it
1289     */
1290    while ((*outcount < incount) &&
1291           (query_op =
1292            op_list_shownext(op_list_array[IND_COMPLETE_RECV_UNEXP])))
1293    {
1294        info[*outcount].error_code = query_op->error_code;
1295        info[*outcount].addr = query_op->addr;
1296        info[*outcount].buffer = query_op->buffer;
1297        info[*outcount].size = query_op->actual_size;
1298        info[*outcount].tag = query_op->msg_tag;
1299        op_list_remove(query_op);
1300        dealloc_tcp_method_op(query_op);
1301        (*outcount)++;
1302    }
1303    gen_mutex_unlock(&interface_mutex);
1304    return (0);
1305}
1306
1307
1308/* BMI_tcp_testcontext()
1309 *
1310 * Checks to see if any messages from the specified context have completed.
1311 *
1312 * returns 0 on success, -errno on failure
1313 */
1314int BMI_tcp_testcontext(int incount,
1315                     bmi_op_id_t* out_id_array,
1316                     int *outcount,
1317                     bmi_error_code_t * error_code_array,
1318                     bmi_size_t * actual_size_array,
1319                     void **user_ptr_array,
1320                     int max_idle_time,
1321                     bmi_context_id context_id)
1322{
1323    int ret = -1;
1324    method_op_p query_op = NULL;
1325
1326    *outcount = 0;
1327
1328    gen_mutex_lock(&interface_mutex);
1329
1330    if(op_list_empty(completion_array[context_id]))
1331    {
1332        /* if there are unexpected ops ready to go, then short out so
1333         * that the next testunexpected call can pick it up without
1334         * delay
1335         */
1336        if(check_unexpected &&
1337           !op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
1338        {
1339            gen_mutex_unlock(&interface_mutex);
1340            return(0);
1341        }
1342
1343        /* do some ``real work'' here */
1344        ret = tcp_do_work(max_idle_time);
1345        if (ret < 0)
1346        {
1347            gen_mutex_unlock(&interface_mutex);
1348            return (ret);
1349        }
1350    }
1351
1352    /* pop as many items off of the completion queue as we can */
1353    while((*outcount < incount) &&
1354          (query_op =
1355           op_list_shownext(completion_array[context_id])))
1356    {
1357        assert(query_op);
1358        assert(query_op->context_id == context_id);
1359
1360        /* this one's done; pop it out */
1361        op_list_remove(query_op);
1362        error_code_array[*outcount] = query_op->error_code;
1363        actual_size_array[*outcount] = query_op->actual_size;
1364        out_id_array[*outcount] = query_op->op_id;
1365        if (user_ptr_array != NULL)
1366        {
1367            user_ptr_array[*outcount] = query_op->user_ptr;
1368        }
1369
1370        PINT_EVENT_END((query_op->send_recv == BMI_SEND ?
1371                        bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
1372                       bmi_tcp_pid, NULL, query_op->event_id,
1373                       query_op->actual_size);
1374
1375        dealloc_tcp_method_op(query_op);
1376        query_op = NULL;
1377        (*outcount)++;
1378    }
1379
1380    gen_mutex_unlock(&interface_mutex);
1381    return(0);
1382}
1383
1384
1385
1386/* BMI_tcp_post_send_list()
1387 *
1388 * same as the BMI_tcp_post_send() function, except that it sends
1389 * from an array of possibly non contiguous buffers
1390 *
1391 * returns 0 on success, 1 on immediate successful completion,
1392 * -errno on failure
1393 */
1394int BMI_tcp_post_send_list(bmi_op_id_t * id,
1395                           bmi_method_addr_p dest,
1396                           const void *const *buffer_list,
1397                           const bmi_size_t *size_list,
1398                           int list_count,
1399                           bmi_size_t total_size,
1400                           enum bmi_buffer_type buffer_type,
1401                           bmi_msg_tag_t tag,
1402                           void *user_ptr,
1403                           bmi_context_id context_id,
1404                           PVFS_hint hints)
1405{
1406    struct tcp_msg_header my_header;
1407    int ret = -1;
1408
1409    /* clear the id field for safety */
1410    *id = 0;
1411
1412    /* fill in the TCP-specific message header */
1413    if (total_size > TCP_MODE_REND_LIMIT)
1414    {
1415        gossip_lerr("Error: BMI message too large!\n");
1416        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1417    }
1418
1419    if (total_size <= TCP_MODE_EAGER_LIMIT)
1420    {
1421        my_header.mode = TCP_MODE_EAGER;
1422    }
1423    else
1424    {
1425        my_header.mode = TCP_MODE_REND;
1426    }
1427    my_header.tag = tag;
1428    my_header.size = total_size;
1429    my_header.magic_nr = BMI_MAGIC_NR;
1430
1431    gen_mutex_lock(&interface_mutex);
1432
1433    ret = tcp_post_send_generic(id, dest, buffer_list,
1434                                size_list, list_count, buffer_type,
1435                                my_header, user_ptr, context_id, hints);
1436    gen_mutex_unlock(&interface_mutex);
1437    return(ret);
1438}
1439
1440/* BMI_tcp_post_recv_list()
1441 *
1442 * same as the BMI_tcp_post_recv() function, except that it recvs
1443 * into an array of possibly non contiguous buffers
1444 *
1445 * returns 0 on success, 1 on immediate successful completion,
1446 * -errno on failure
1447 */
1448int BMI_tcp_post_recv_list(bmi_op_id_t * id,
1449                           bmi_method_addr_p src,
1450                           void *const *buffer_list,
1451                           const bmi_size_t *size_list,
1452                           int list_count,
1453                           bmi_size_t total_expected_size,
1454                           bmi_size_t * total_actual_size,
1455                           enum bmi_buffer_type buffer_type,
1456                           bmi_msg_tag_t tag,
1457                           void *user_ptr,
1458                           bmi_context_id context_id,
1459                           PVFS_hint hints)
1460{
1461    int ret = -1;
1462
1463    if (total_expected_size > TCP_MODE_REND_LIMIT)
1464    {
1465        return (bmi_tcp_errno_to_pvfs(-EINVAL));
1466    }
1467
1468    gen_mutex_lock(&interface_mutex);
1469
1470    ret = tcp_post_recv_generic(id, src, buffer_list, size_list,
1471                                list_count, total_expected_size,
1472                                total_actual_size, buffer_type, tag, user_ptr,
1473                                context_id, hints);
1474
1475    gen_mutex_unlock(&interface_mutex);
1476    return (ret);
1477}
1478
1479
1480/* BMI_tcp_post_sendunexpected_list()
1481 *
1482 * same as the BMI_tcp_post_sendunexpected() function, except that
1483 * it sends from an array of possibly non contiguous buffers
1484 *
1485 * returns 0 on success, 1 on immediate successful completion,
1486 * -errno on failure
1487 */
1488int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
1489                                     bmi_method_addr_p dest,
1490                                     const void *const *buffer_list,
1491                                     const bmi_size_t *size_list,
1492                                     int list_count,
1493                                     bmi_size_t total_size,
1494                                     enum bmi_buffer_type buffer_type,
1495                                     bmi_msg_tag_t tag,
1496                                     void *user_ptr,
1497                                     bmi_context_id context_id,
1498                                     PVFS_hint hints)
1499{
1500    struct tcp_msg_header my_header;
1501    int ret = -1;
1502
1503    /* clear the id field for safety */
1504    *id = 0;
1505
1506    if (total_size > TCP_MODE_EAGER_LIMIT)
1507    {
1508        return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
1509    }
1510
1511    my_header.mode = TCP_MODE_UNEXP;
1512    my_header.tag = tag;
1513    my_header.size = total_size;
1514    my_header.magic_nr = BMI_MAGIC_NR;
1515
1516    gen_mutex_lock(&interface_mutex);
1517
1518    ret = tcp_post_send_generic(id, dest, buffer_list,
1519                                size_list, list_count, buffer_type,
1520                                my_header, user_ptr, context_id, hints);
1521
1522    gen_mutex_unlock(&interface_mutex);
1523    return(ret);
1524}
1525
1526
1527/* BMI_tcp_open_context()
1528 *
1529 * opens a new context with the specified context id
1530 *
1531 * returns 0 on success, -errno on failure
1532 */
1533int BMI_tcp_open_context(bmi_context_id context_id)
1534{
1535
1536    gen_mutex_lock(&interface_mutex);
1537
1538    /* start a new queue for tracking completions in this context */
1539    completion_array[context_id] = op_list_new();
1540    if (!completion_array[context_id])
1541    {
1542        gen_mutex_unlock(&interface_mutex);
1543        return(bmi_tcp_errno_to_pvfs(-ENOMEM));
1544    }
1545
1546    gen_mutex_unlock(&interface_mutex);
1547    return(0);
1548}
1549
1550
1551/* BMI_tcp_close_context()
1552 *
1553 * shuts down a context, previously opened with BMI_tcp_open_context()
1554 *
1555 * no return value
1556 */
1557void BMI_tcp_close_context(bmi_context_id context_id)
1558{
1559   
1560    gen_mutex_lock(&interface_mutex);
1561
1562    /* tear down completion queue for this context */
1563    op_list_cleanup(completion_array[context_id]);
1564
1565    gen_mutex_unlock(&interface_mutex);
1566    return;
1567}
1568
1569
1570/* BMI_tcp_cancel()
1571 *
1572 * attempt to cancel a pending bmi tcp operation
1573 *
1574 * returns 0 on success, -errno on failure
1575 */
1576int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id)
1577{
1578    method_op_p query_op = NULL;
1579   
1580    gen_mutex_lock(&interface_mutex);
1581
1582    query_op = (method_op_p)id_gen_fast_lookup(id);
1583    if(!query_op)
1584    {
1585        /* if we can't find the operattion, then assume that it has already
1586         * completed naturally
1587         */
1588        gen_mutex_unlock(&interface_mutex);
1589        return(0);
1590    }
1591
1592    /* easy case: is the operation already completed? */
1593    if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
1594        BMI_TCP_COMPLETE)
1595    {
1596        /* only close socket in forceful cancel mode */
1597        if(forceful_cancel_mode)
1598            tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1599        /* we are done! status will be collected during test */
1600        gen_mutex_unlock(&interface_mutex);
1601        return(0);
1602    }
1603
1604    /* has the operation started moving data yet? */
1605    if(query_op->env_amt_complete)
1606    {
1607        /* be pessimistic and kill the socket, even if not in forceful
1608         * cancel mode */
1609        /* NOTE: this may place other operations beside this one into
1610         * EINTR error state
1611         */
1612        tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1613        gen_mutex_unlock(&interface_mutex);
1614        return(0);
1615    }
1616
1617    /* if we fall to this point, op has been posted, but no data has moved
1618     * for it yet as far as we know
1619     */
1620
1621    /* mark op as canceled, move to completion queue */
1622    query_op->error_code = -BMI_ECANCEL;
1623    if(query_op->send_recv == BMI_SEND)
1624    {
1625        BMI_socket_collection_remove_write_bit(tcp_socket_collection_p,
1626                                           query_op->addr);
1627    }
1628    op_list_remove(query_op);
1629    ((struct tcp_op*)(query_op->method_data))->tcp_op_state =
1630        BMI_TCP_COMPLETE;
1631    /* only close socket in forceful cancel mode */
1632    if(forceful_cancel_mode)
1633        tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
1634    op_list_add(completion_array[query_op->context_id], query_op);
1635    gen_mutex_unlock(&interface_mutex);
1636    return(0);
1637}
1638
1639/*
1640 * For now, we only support wildcard strings that are IP addresses
1641 * and not *hostnames*!
1642 */
1643static int check_valid_wildcard(const char *wildcard_string, unsigned long *octets)
1644{
1645    int i, len = strlen(wildcard_string), last_dot = -1, octet_count = 0;
1646    char str[16];
1647    for (i = 0; i < len; i++)
1648    {
1649        char c = wildcard_string[i];
1650        memset(str, 0, 16);
1651        if ((c < '0' || c > '9') && c != '*' && c != '.')
1652            return -EINVAL;
1653        if (c == '*') {
1654            if (octet_count >= 4)
1655                return -EINVAL;
1656            octets[octet_count++] = 256;
1657        }
1658        else if (c == '.')
1659        {
1660            char *endptr = NULL;
1661            if (octet_count >= 4)
1662                return -EINVAL;
1663            strncpy(str, &wildcard_string[last_dot + 1], (i - last_dot - 1));
1664            octets[octet_count++] = strtol(str, &endptr, 10);
1665            if (*endptr != '\0' || octets[octet_count-1] >= 256)
1666                return -EINVAL;
1667            last_dot = i;
1668        }
1669    }
1670    for (i = octet_count; i < 4; i++)
1671    {
1672         octets[i] = 256;
1673    }
1674    return 0;
1675}
1676
1677/*
1678 * return 1 if the addr specified is part of the wildcard specification of octet
1679 * return 0 otherwise.
1680 */
1681static int check_octets(struct in_addr addr, unsigned long *octets)
1682{
1683#define B1_MASK  0xff000000
1684#define B1_SHIFT 24
1685#define B2_MASK  0x00ff0000
1686#define B2_SHIFT 16
1687#define B3_MASK  0x0000ff00
1688#define B3_SHIFT 8
1689#define B4_MASK  0x000000ff
1690    uint32_t host_addr = ntohl(addr.s_addr);
1691    /* * stands for all clients */
1692    if (octets[0] == 256)
1693    {
1694        return 1;
1695    }
1696    if (((host_addr & B1_MASK) >> B1_SHIFT) != octets[0])
1697    {
1698        return 0;
1699    }
1700    if (octets[1] == 256)
1701    {
1702        return 1;
1703    }
1704    if (((host_addr & B2_MASK) >> B2_SHIFT) != octets[1])
1705    {
1706        return 0;
1707    }
1708    if (octets[2] == 256)
1709    {
1710        return 1;
1711    }
1712    if (((host_addr & B3_MASK) >> B3_SHIFT) != octets[2])
1713    {
1714        return 0;
1715    }
1716    if (octets[3] == 256)
1717    {
1718        return 1;
1719    }
1720    if ((host_addr & B4_MASK) != octets[3])
1721    {
1722        return 0;
1723    }
1724    return 1;
1725#undef B1_MASK
1726#undef B1_SHIFT
1727#undef B2_MASK
1728#undef B2_SHIFT
1729#undef B3_MASK
1730#undef B3_SHIFT
1731#undef B4_MASK
1732}
1733/* BMI_tcp_query_addr_range()
1734 * Check if a given address is within the network specified by the wildcard string!
1735 * or if it is part of the subnet mask specified
1736 */
1737int BMI_tcp_query_addr_range(bmi_method_addr_p map, const char *wildcard_string, int netmask)
1738{
1739    struct tcp_addr *tcp_addr_data = map->method_data;
1740    struct sockaddr_in map_addr;
1741    socklen_t map_addr_len = sizeof(map_addr);
1742    const char *tcp_wildcard = wildcard_string + 6 /* strlen("tcp://") */;
1743    int ret = -1;
1744
1745    memset(&map_addr, 0, sizeof(map_addr));
1746    if(getpeername(tcp_addr_data->socket, (struct sockaddr *) &map_addr, &map_addr_len) < 0)
1747    {
1748        ret =  bmi_tcp_errno_to_pvfs(-EINVAL);
1749        gossip_err("Error: failed to retrieve peer name for client.\n");
1750        return(ret);
1751    }
1752    /* Wildcard specification */
1753    if (netmask == -1)
1754    {
1755        unsigned long octets[4];
1756        if (check_valid_wildcard(tcp_wildcard, octets) < 0)
1757        {
1758            gossip_lerr("Invalid wildcard specification: %s\n", tcp_wildcard);
1759            return -EINVAL;
1760        }
1761        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Map Address is : %s, Wildcard Octets: %lu.%lu.%lu.%lu\n", inet_ntoa(map_addr.sin_addr),
1762                octets[0], octets[1], octets[2], octets[3]);
1763        if (check_octets(map_addr.sin_addr, octets) == 1)
1764        {
1765            return 1;
1766        }
1767    }
1768    /* Netmask specification */
1769    else {
1770        struct sockaddr_in mask_addr, network_addr;
1771        memset(&mask_addr, 0, sizeof(mask_addr));
1772        memset(&network_addr, 0, sizeof(network_addr));
1773        /* Convert the netmask address */
1774        convert_mask(netmask, &mask_addr.sin_addr);
1775        /* Invalid network address */
1776        if (inet_aton(tcp_wildcard, &network_addr.sin_addr) == 0)
1777        {
1778            gossip_err("Invalid network specification: %s\n", tcp_wildcard);
1779            return -EINVAL;
1780        }
1781        /* Matches the subnet mask! */
1782        if ((map_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr)
1783                == (network_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr))
1784        {
1785            return 1;
1786        }
1787    }
1788    return 0;
1789}
1790
1791/* BMI_tcp_addr_rev_lookup_unexpected()
1792 *
1793 * looks up an address that was initialized unexpectedly and returns a string
1794 * hostname
1795 *
1796 * returns string on success, "UNKNOWN" on failure
1797 */
1798const char* BMI_tcp_addr_rev_lookup_unexpected(bmi_method_addr_p map)
1799{
1800    struct tcp_addr *tcp_addr_data = map->method_data;
1801    int debug_on;
1802    uint64_t mask;
1803    socklen_t peerlen;
1804    struct sockaddr_in peer;
1805    int ret;
1806    struct hostent *peerent;
1807    char* tmp_peer;
1808
1809    /* return default response if we don't have support for the right socket
1810     * calls
1811     */
1812#if !defined(HAVE_GETHOSTBYADDR)
1813    return(tcp_addr_data->peer);
1814#else
1815
1816    /* Only resolve hostnames if a gossip mask is set to request it.
1817     * Otherwise we leave it at ip address
1818     */
1819    gossip_get_debug_mask(&debug_on, &mask);
1820
1821    if(!debug_on || (!(mask & GOSSIP_ACCESS_HOSTNAMES)))
1822    {
1823        return(tcp_addr_data->peer);
1824    }
1825
1826    peerlen = sizeof(struct sockaddr_in);
1827
1828    if(tcp_addr_data->peer_type == BMI_TCP_PEER_HOSTNAME)
1829    {
1830        /* full hostname already cached; return now */
1831        return(tcp_addr_data->peer);
1832    }
1833
1834    /* if we hit this point, we need to resolve hostname */
1835    ret = getpeername(tcp_addr_data->socket, (struct sockaddr*)&(peer), &peerlen);
1836    if(ret < 0)
1837    {
1838        /* default to use IP address */
1839        return(tcp_addr_data->peer);
1840    }
1841
1842    peerent = gethostbyaddr((void*)&peer.sin_addr.s_addr,
1843        sizeof(struct in_addr), AF_INET);
1844    if(peerent == NULL)
1845    {
1846        /* default to use IP address */
1847        return(tcp_addr_data->peer);
1848    }
1849 
1850    tmp_peer = (char*)malloc(strlen(peerent->h_name) + 1);
1851    if(!tmp_peer)
1852    {
1853        /* default to use IP address */
1854        return(tcp_addr_data->peer);
1855    }
1856    strcpy(tmp_peer, peerent->h_name);
1857    if(tcp_addr_data->peer)
1858    {
1859        free(tcp_addr_data->peer);
1860    }
1861    tcp_addr_data->peer = tmp_peer;
1862    tcp_addr_data->peer_type = BMI_TCP_PEER_HOSTNAME;
1863    return(tcp_addr_data->peer);
1864
1865#endif
1866
1867}
1868
1869/* tcp_forget_addr()
1870 *
1871 * completely removes a tcp method address from use, and aborts any
1872 * operations that use the address.  If the
1873 * dealloc_flag is set, the memory used by the address will be
1874 * deallocated as well.
1875 *
1876 * no return value
1877 */
1878void tcp_forget_addr(bmi_method_addr_p map,
1879                     int dealloc_flag,
1880                     int error_code)
1881{
1882    struct tcp_addr* tcp_addr_data = map->method_data;
1883    BMI_addr_t bmi_addr = tcp_addr_data->bmi_addr;
1884    int tmp_outcount;
1885    bmi_method_addr_p tmp_addr;
1886    int tmp_status;
1887
1888    if (tcp_socket_collection_p)
1889    {
1890        BMI_socket_collection_remove(tcp_socket_collection_p, map);
1891        /* perform a test to force the socket collection to act on the remove
1892         * request before continuing
1893         */
1894        if(!sc_test_busy)
1895        {
1896            BMI_socket_collection_testglobal(tcp_socket_collection_p,
1897                0, &tmp_outcount, &tmp_addr, &tmp_status, 0);
1898        }
1899    }
1900
1901    tcp_shutdown_addr(map);
1902    tcp_cleanse_addr(map, error_code);
1903    tcp_addr_data->addr_error = error_code;
1904    if (dealloc_flag)
1905    {
1906        dealloc_tcp_method_addr(map);
1907    }
1908    else
1909    {
1910        /* this will cause the bmi control layer to check to see if
1911         * this address can be completely forgotten
1912         */
1913        bmi_method_addr_forget_callback(bmi_addr);
1914    }
1915    return;
1916};
1917
1918/******************************************************************
1919 * Internal support functions
1920 */
1921
1922
1923/*
1924 * dealloc_tcp_method_addr()
1925 *
1926 * destroys method address structures generated by the TCP/IP module.
1927 *
1928 * no return value
1929 */
1930static void dealloc_tcp_method_addr(bmi_method_addr_p map)
1931{
1932
1933    struct tcp_addr *tcp_addr_data = NULL;
1934
1935    tcp_addr_data = map->method_data;
1936    /* close the socket, as long as it is not the one we are listening on
1937     * as a server.
1938     */
1939    if (!tcp_addr_data->server_port)
1940    {
1941        if (tcp_addr_data->socket > -1)
1942        {
1943            close(tcp_addr_data->socket);
1944        }
1945    }
1946
1947    if (tcp_addr_data->hostname)
1948        free(tcp_addr_data->hostname);
1949    if (tcp_addr_data->peer)
1950        free(tcp_addr_data->peer);
1951
1952    bmi_dealloc_method_addr(map);
1953
1954    return;
1955}
1956
1957
1958/*
1959 * alloc_tcp_method_addr()
1960 *
1961 * creates a new method address with defaults filled in for TCP/IP.
1962 *
1963 * returns pointer to struct on success, NULL on failure
1964 */
1965bmi_method_addr_p alloc_tcp_method_addr(void)
1966{
1967
1968    struct bmi_method_addr *my_method_addr = NULL;
1969    struct tcp_addr *tcp_addr_data = NULL;
1970
1971    my_method_addr =
1972        bmi_alloc_method_addr(tcp_method_params.method_id, sizeof(struct tcp_addr));
1973    if (!my_method_addr)
1974    {
1975        return (NULL);
1976    }
1977
1978    /* note that we trust the alloc_method_addr() function to have zeroed
1979     * out the structures for us already
1980     */
1981
1982    tcp_addr_data = my_method_addr->method_data;
1983    tcp_addr_data->socket = -1;
1984    tcp_addr_data->port = -1;
1985    tcp_addr_data->map = my_method_addr;
1986    tcp_addr_data->sc_index = -1;
1987
1988    return (my_method_addr);
1989}
1990
1991
1992/*
1993 * tcp_server_init()
1994 *
1995 * this function is used to prepare a node to recieve incoming
1996 * connections if it is initialized in a server configuration.   
1997 *
1998 * returns 0 on succes, -errno on failure
1999 */
2000static int tcp_server_init(void)
2001{
2002
2003    int oldfl = 0;              /* old socket flags */
2004    struct tcp_addr *tcp_addr_data = NULL;
2005    int tmp_errno = bmi_tcp_errno_to_pvfs(-EINVAL);
2006    int ret = 0;
2007
2008    /* create a socket */
2009    tcp_addr_data = tcp_method_params.listen_addr->method_data;
2010    if ((tcp_addr_data->socket = BMI_sockio_new_sock()) < 0)
2011    {
2012        tmp_errno = errno;
2013        gossip_err("Error: BMI_sockio_new_sock: %s\n", strerror(tmp_errno));
2014        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2015    }
2016
2017    /* set it to non-blocking operation */
2018    oldfl = fcntl(tcp_addr_data->socket, F_GETFL, 0);
2019    if (!(oldfl & O_NONBLOCK))
2020    {
2021        fcntl(tcp_addr_data->socket, F_SETFL, oldfl | O_NONBLOCK);
2022    }
2023
2024    /* setup for a fast restart to avoid bind addr in use errors */
2025    BMI_sockio_set_sockopt(tcp_addr_data->socket, SO_REUSEADDR, 1);
2026
2027    /* bind it to the appropriate port */
2028    if(tcp_method_params.method_flags & BMI_TCP_BIND_SPECIFIC)
2029    {
2030        ret = BMI_sockio_bind_sock_specific(tcp_addr_data->socket,
2031            tcp_addr_data->hostname,
2032            tcp_addr_data->port);
2033        /* NOTE: this particular function converts errno in advance */
2034        if(ret < 0)
2035        {
2036            PVFS_perror_gossip("BMI_sockio_bind_sock_specific", ret);
2037            return(ret);
2038        }
2039    }
2040    else
2041    {
2042        ret = BMI_sockio_bind_sock(tcp_addr_data->socket,
2043            tcp_addr_data->port);
2044    }
2045   
2046    if (ret < 0)
2047    {
2048        tmp_errno = errno;
2049        gossip_err("Error: BMI_sockio_bind_sock: %s\n", strerror(tmp_errno));
2050        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2051    }
2052
2053    /* go ahead and listen to the socket */
2054    if (listen(tcp_addr_data->socket, TCP_BACKLOG) != 0)
2055    {
2056        tmp_errno = errno;
2057        gossip_err("Error: listen: %s\n", strerror(tmp_errno));
2058        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2059    }
2060
2061    return (0);
2062}
2063
2064
2065/* find_recv_inflight()
2066 *
2067 * checks to see if there is a recv operation in flight (when in flight
2068 * means that some of the data or envelope has been read) for a
2069 * particular address.
2070 *
2071 * returns pointer to operation on success, NULL if nothing found.
2072 */
2073static method_op_p find_recv_inflight(bmi_method_addr_p map)
2074{
2075    struct op_list_search_key key;
2076    method_op_p query_op = NULL;
2077
2078    memset(&key, 0, sizeof(struct op_list_search_key));
2079    key.method_addr = map;
2080    key.method_addr_yes = 1;
2081
2082    query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
2083
2084    return (query_op);
2085}
2086
2087
2088/* tcp_sock_init()
2089 *
2090 * this is an internal function which is used to build up a TCP/IP
2091 * connection in the situation of a client side operation.
2092 * addressing information to determine which fields need to be set.
2093 * If the connection is already established then it does no work.
2094 *
2095 * NOTE: this is safe to call repeatedly.  However, always check the
2096 * value of the not_connected field in the tcp address before using the
2097 * address.
2098 *
2099 * returns 0 on success, -errno on failure
2100 */
2101static int tcp_sock_init(bmi_method_addr_p my_method_addr)
2102{
2103
2104    int oldfl = 0;              /* socket flags */
2105    int ret = -1;
2106    struct pollfd poll_conn;
2107    struct tcp_addr *tcp_addr_data = my_method_addr->method_data;
2108    int tmp_errno = 0;
2109
2110    /* check for obvious problems */
2111    assert(my_method_addr);
2112    assert(my_method_addr->method_type == tcp_method_params.method_id);
2113    assert(tcp_addr_data->server_port == 0);
2114
2115    /* fail immediately if the address is in failure mode and we have no way
2116     * to reconnect
2117     */
2118    if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
2119    {
2120        gossip_debug(GOSSIP_BMI_DEBUG_TCP,
2121        "Warning: BMI communication attempted on an address in failure mode.\n");
2122        return(tcp_addr_data->addr_error);
2123    }
2124
2125    if(tcp_addr_data->addr_error)
2126    {
2127        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "%s: attempting reconnect.\n",
2128          __func__);
2129        tcp_addr_data->addr_error = 0;
2130        assert(tcp_addr_data->socket < 0);
2131        tcp_addr_data->not_connected = 1;
2132    }
2133
2134    /* is there already a socket? */
2135    if (tcp_addr_data->socket > -1)
2136    {
2137        /* check to see if we still need to work on the connect.. */
2138        if (tcp_addr_data->not_connected)
2139        {
2140            /* this is a little weird, but we complete the nonblocking
2141             * connection by polling */
2142            poll_conn.fd = tcp_addr_data->socket;
2143            poll_conn.events = POLLOUT;
2144            ret = poll(&poll_conn, 1, 2);
2145            if ((ret < 0) || (poll_conn.revents & POLLERR))
2146            {
2147                tmp_errno = errno;
2148                gossip_lerr("Error: poll: %s\n", strerror(tmp_errno));
2149                return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2150            }
2151            if (poll_conn.revents & POLLOUT)
2152            {
2153                tcp_addr_data->not_connected = 0;
2154            }
2155        }
2156        /* return.  the caller should check the "not_connected" flag to
2157         * see if the socket is usable yet. */
2158        return (0);
2159    }
2160 
2161 /* This function call makes no sense - we know the socket
2162  * is not a valid value, otherwise we would have taken the
2163  * above branch.  Commenting out for now WBL 9/11
2164  */
2165/*    bmi_set_sock_buffers(tcp_addr_data->socket); */
2166
2167    /* at this point there is no socket.  try to build it */
2168    if (tcp_addr_data->port < 1)
2169    {
2170        return (bmi_tcp_errno_to_pvfs(-EINVAL));
2171    }
2172
2173    /* make a socket */
2174    if ((tcp_addr_data->socket = BMI_sockio_new_sock()) < 0)
2175    {
2176        tmp_errno = errno;
2177        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2178    }
2179
2180    /* set it to non-blocking operation */
2181    oldfl = fcntl(tcp_addr_data->socket, F_GETFL, 0);
2182    if (!(oldfl & O_NONBLOCK))
2183    {
2184        fcntl(tcp_addr_data->socket, F_SETFL, oldfl | O_NONBLOCK);
2185    }
2186
2187#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
2188    /* make sure if we need to bind or not to some local port ranges */
2189    tcp_enable_trusted(tcp_addr_data);
2190#endif
2191
2192    /* turn off Nagle's algorithm */
2193    if (BMI_sockio_set_tcpopt(tcp_addr_data->socket, TCP_NODELAY, 1) < 0)
2194    {
2195        tmp_errno = errno;
2196        gossip_lerr("Error: failed to set TCP_NODELAY option.\n");
2197        close(tcp_addr_data->socket);
2198        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
2199    }
2200
2201       bmi_set_sock_buffers(tcp_addr_data->socket);
2202
2203    if (tcp_addr_data->hostname)
2204    {
2205        gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
2206                      "Connect: socket=%d, hostname=%s, port=%d\n",
2207                      tcp_addr_data->socket, tcp_addr_data->hostname,
2208                      tcp_addr_data->port);
2209        ret = BMI_sockio_connect_sock(tcp_addr_data->socket,
2210                      tcp_addr_data->hostname,
2211                      tcp_addr_data->port);
2212    }
2213    else
2214    {
2215        return (bmi_tcp_errno_to_pvfs(-EINVAL));
2216    }
2217
2218    if (ret < 0)
2219    {
2220        if (ret == -EINPROGRESS)
2221        {
2222            tcp_addr_data->not_connected = 1;
2223            /* this will have to be connected later with a poll */
2224        }
2225        else
2226        {
2227            /* NOTE: BMI_sockio_connect_sock returns a PVFS error */
2228            char buff[300];
2229
2230            snprintf(buff, 300, "Error: BMI_sockio_connect_sock: (%s):",
2231                     tcp_addr_data->hostname);
2232
2233            PVFS_perror_gossip(buff, ret);
2234            return (ret);
2235        }
2236    }
2237
2238    return (0);
2239}
2240
2241
2242/* enqueue_operation()
2243 *
2244 * creates a new operation based on the arguments to the function.  It
2245 * then makes sure that the address is added to the socket collection,
2246 * and the operation is added to the appropriate operation queue.
2247 *
2248 * Damn, what a big prototype!
2249 *
2250 * returns 0 on success, -errno on failure
2251 */
2252static int enqueue_operation(op_list_p target_list,
2253                             enum bmi_op_type send_recv,
2254                             bmi_method_addr_p map,
2255                             void *const *buffer_list,
2256                             const bmi_size_t *size_list,
2257                             int list_count,
2258                             bmi_size_t amt_complete,
2259                             bmi_size_t env_amt_complete,
2260                             bmi_op_id_t * id,
2261                             int tcp_op_state,
2262                             struct tcp_msg_header header,
2263                             void *user_ptr,
2264                             bmi_size_t actual_size,
2265                             bmi_size_t expected_size,
2266                             bmi_context_id context_id,
2267                             int32_t eid)
2268{
2269    method_op_p new_method_op = NULL;
2270    struct tcp_op *tcp_op_data = NULL;
2271    struct tcp_addr* tcp_addr_data = NULL;
2272    int i;
2273
2274    /* allocate the operation structure */
2275    new_method_op = alloc_tcp_method_op();
2276    if (!new_method_op)
2277    {
2278        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
2279    }
2280
2281    *id = new_method_op->op_id;
2282    new_method_op->event_id = eid;
2283
2284    /* set the fields */
2285    new_method_op->send_recv = send_recv;
2286    new_method_op->addr = map;
2287    new_method_op->user_ptr = user_ptr;
2288    /* this is on purpose; we want to use the buffer_list all of
2289     * the time, no special case for one contig buffer
2290     */
2291    new_method_op->buffer = NULL;
2292    new_method_op->actual_size = actual_size;
2293    new_method_op->expected_size = expected_size;
2294    new_method_op->send_recv = send_recv;
2295    new_method_op->amt_complete = amt_complete;
2296    new_method_op->env_amt_complete = env_amt_complete;
2297    new_method_op->msg_tag = header.tag;
2298    new_method_op->mode = header.mode;
2299    new_method_op->list_count = list_count;
2300    new_method_op->context_id = context_id;
2301
2302    /* set our current position in list processing */
2303    i=0;
2304    new_method_op->list_index = 0;
2305    new_method_op->cur_index_complete = 0;
2306    while(amt_complete > 0)
2307    {
2308        if(amt_complete >= size_list[i])
2309        {
2310            amt_complete -= size_list[i];
2311            new_method_op->list_index++;
2312            i++;
2313        }
2314        else
2315        {
2316            new_method_op->cur_index_complete = amt_complete;
2317            amt_complete = 0;
2318        }
2319    }
2320
2321    tcp_op_data = new_method_op->method_data;
2322    tcp_op_data->tcp_op_state = tcp_op_state;
2323    tcp_op_data->env = header;
2324
2325    /* if there is only one item in the list, then keep the list stored
2326     * in the op structure.  This allows us to use the same code for send
2327     * and recv as we use for send_list and recv_list, without having to
2328     * malloc lists for those special cases
2329     */
2330    if (list_count == 1)
2331    {
2332        new_method_op->buffer_list = &tcp_op_data->buffer_list_stub;
2333        new_method_op->size_list = &tcp_op_data->size_list_stub;
2334        ((void**)new_method_op->buffer_list)[0] = buffer_list[0];
2335        ((bmi_size_t*)new_method_op->size_list)[0] = size_list[0];
2336    }
2337    else
2338    {
2339        new_method_op->size_list = size_list;
2340        new_method_op->buffer_list = buffer_list;
2341    }
2342
2343    tcp_addr_data = map->method_data;
2344
2345    if(tcp_addr_data->addr_error)
2346    {
2347        /* server should always fail here, client should let receives queue
2348         * as if nothing were wrong
2349         */
2350        if(tcp_addr_data->dont_reconnect || send_recv == BMI_SEND)
2351        {
2352            gossip_debug(GOSSIP_BMI_DEBUG_TCP,
2353                       "Warning: BMI communication attempted on an "
2354                       "address in failure mode.\n");
2355            new_method_op->error_code = tcp_addr_data->addr_error;
2356            op_list_add(op_list_array[new_method_op->context_id],
2357                        new_method_op);
2358            return(tcp_addr_data->addr_error);
2359        }
2360    }
2361
2362#if 0
2363    if(tcp_addr_data->addr_error)
2364    {
2365        /* this address is bad, don't try to do anything with it */
2366        gossip_err("Warning: BMI communication attempted on an "
2367                   "address in failure mode.\n");
2368
2369        new_method_op->error_code = tcp_addr_data->addr_error;
2370        op_list_add(op_list_array[new_method_op->context_id],
2371                    new_method_op);
2372        return(tcp_addr_data->addr_error);
2373    }
2374#endif
2375
2376    /* add the socket to poll on */
2377    BMI_socket_collection_add(tcp_socket_collection_p, map);
2378    if(send_recv == BMI_SEND)
2379    {
2380        BMI_socket_collection_add_write_bit(tcp_socket_collection_p, map);
2381    }
2382
2383    /* keep up with the operation */
2384    op_list_add(target_list, new_method_op);
2385
2386    return (0);
2387}
2388
2389
2390/* tcp_post_recv_generic()
2391 *
2392 * does the real work of posting an operation - works for both
2393 * eager and rendezvous messages
2394 *
2395 * returns 0 on success that requires later poll, returns 1 on instant
2396 * completion, -errno on failure
2397 */
2398static int tcp_post_recv_generic(bmi_op_id_t * id,
2399                                 bmi_method_addr_p src,
2400                                 void *const *buffer_list,
2401                                 const bmi_size_t *size_list,
2402                                 int list_count,
2403                                 bmi_size_t expected_size,
2404                                 bmi_size_t * actual_size,
2405                                 enum bmi_buffer_type buffer_type,
2406                                 bmi_msg_tag_t tag,
2407                                 void *user_ptr,
2408                                 bmi_context_id context_id,
2409                                 PVFS_hint hints)
2410{
2411    method_op_p query_op = NULL;
2412    int ret = -1;
2413    struct tcp_addr *tcp_addr_data = NULL;
2414    struct tcp_op *tcp_op_data = NULL;
2415    struct tcp_msg_header bogus_header;
2416    struct op_list_search_key key;
2417    bmi_size_t copy_size = 0;
2418    bmi_size_t total_copied = 0;
2419    int i;
2420    PINT_event_id eid = 0;
2421
2422    PINT_EVENT_START(
2423        bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, &eid,
2424        PINT_HINT_GET_CLIENT_ID(hints),
2425        PINT_HINT_GET_REQUEST_ID(hints),
2426        PINT_HINT_GET_RANK(hints),
2427        PINT_HINT_GET_HANDLE(hints),
2428        PINT_HINT_GET_OP_ID(hints),
2429        expected_size);
2430
2431    tcp_addr_data = src->method_data;
2432
2433    /* short out immediately if the address is bad and we have no way to
2434     * reconnect
2435     */
2436    if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
2437    {
2438        gossip_debug(
2439            GOSSIP_BMI_DEBUG_TCP,
2440            "Warning: BMI communication attempted "
2441            "on an address in failure mode.\n");
2442        return(tcp_addr_data->addr_error);
2443    }
2444
2445    /* lets make sure that the message hasn't already been fully
2446     * buffered in eager mode before doing anything else
2447     */
2448    memset(&key, 0, sizeof(struct op_list_search_key));
2449    key.method_addr = src;
2450    key.method_addr_yes = 1;
2451    key.msg_tag = tag;
2452    key.msg_tag_yes = 1;
2453
2454    query_op =
2455        op_list_search(op_list_array[IND_RECV_EAGER_DONE_BUFFERING], &key);
2456    if (query_op)
2457    {
2458        /* make sure it isn't too big */
2459        if (query_op->actual_size > expected_size)
2460        {
2461            gossip_err("Error: message ordering violation;\n");
2462            gossip_err("Error: message too large for next buffer.\n");
2463            return (bmi_tcp_errno_to_pvfs(-EPROTO));
2464        }
2465
2466        /* whoohoo- it is already done! */
2467        /* copy buffer out to list segments; handle short case */
2468        for (i = 0; i < list_count; i++)
2469        {
2470            copy_size = size_list[i];
2471            if (copy_size + total_copied > query_op->actual_size)
2472            {
2473                copy_size = query_op->actual_size - total_copied;
2474            }
2475            memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
2476                                             total_copied), copy_size);
2477            total_copied += copy_size;
2478            if (total_copied == query_op->actual_size)
2479            {
2480                break;
2481            }
2482        }
2483        /* copy out to correct memory regions */
2484        (*actual_size) = query_op->actual_size;
2485        free(query_op->buffer);
2486        *id = 0;
2487        op_list_remove(query_op);
2488        dealloc_tcp_method_op(query_op);
2489        PINT_EVENT_END(bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid, 0,
2490                       *actual_size);
2491
2492        return (1);
2493    }
2494
2495    /* look for a message that is already being received */
2496    query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
2497    if (query_op)
2498    {
2499        tcp_op_data = query_op->method_data;
2500    }
2501
2502    /* see if it is being buffered into a temporary memory region */
2503    if (query_op && tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
2504    {
2505        /* make sure it isn't too big */
2506        if (query_op->actual_size > expected_size)
2507        {
2508            gossip_err("Error: message ordering violation;\n");
2509            gossip_err("Error: message too large for next buffer.\n");
2510            return (bmi_tcp_errno_to_pvfs(-EPROTO));
2511        }
2512
2513        /* copy what we have so far into the correct buffers */
2514        total_copied = 0;
2515        for (i = 0; i < list_count; i++)
2516        {
2517            copy_size = size_list[i];
2518            if (copy_size + total_copied > query_op->amt_complete)
2519            {
2520                copy_size = query_op->amt_complete - total_copied;
2521            }
2522            if (copy_size > 0)
2523            {
2524                memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
2525                                                 total_copied), copy_size);
2526            }
2527            total_copied += copy_size;
2528            if (total_copied == query_op->amt_complete)
2529            {
2530                query_op->list_index = i;
2531                query_op->cur_index_complete = copy_size;
2532                break;
2533            }
2534        }
2535
2536        /* see if we ended on a buffer boundary */
2537        if (query_op->cur_index_complete ==
2538            query_op->size_list[query_op->list_index])
2539        {
2540            query_op->list_index++;
2541            query_op->cur_index_complete = 0;
2542        }
2543
2544        /* release the old buffer */
2545        if (query_op->buffer)
2546        {
2547            free(query_op->buffer);
2548        }
2549
2550        *id = query_op->op_id;
2551        tcp_op_data = query_op->method_data;
2552        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
2553
2554        query_op->list_count = list_count;
2555        query_op->user_ptr = user_ptr;
2556        query_op->context_id = context_id;
2557        /* if there is only one item in the list, then keep the list stored
2558         * in the op structure.  This allows us to use the same code for send
2559         * and recv as we use for send_list and recv_list, without having to
2560         * malloc lists for those special cases
2561         */
2562        if (list_count == 1)
2563        {
2564            query_op->buffer_list = &tcp_op_data->buffer_list_stub;
2565            query_op->size_list = &tcp_op_data->size_list_stub;
2566            ((void **)query_op->buffer_list)[0] = buffer_list[0];
2567            ((bmi_size_t *)query_op->size_list)[0] = size_list[0];
2568        }
2569        else
2570        {
2571            query_op->buffer_list = buffer_list;
2572            query_op->size_list = size_list;
2573        }
2574
2575        if (query_op->amt_complete < query_op->actual_size)
2576        {
2577            /* try to recv some more data */
2578            tcp_addr_data = query_op->addr->method_data;
2579            ret = payload_progress(tcp_addr_data->socket,
2580                                   query_op->buffer_list,
2581                                   query_op->size_list,
2582                                   query_op->list_count,
2583                                   query_op->actual_size,
2584                                   &(query_op->list_index),
2585                                   &(query_op->cur_index_complete),
2586                                   BMI_RECV,
2587                                   NULL,
2588                                   0);
2589            if (ret < 0)
2590            {
2591                PVFS_perror_gossip("Error: payload_progress", ret);
2592                /* payload_progress() returns BMI error codes */
2593                tcp_forget_addr(query_op->addr, 0, ret);
2594                return (ret);
2595            }
2596
2597            query_op->amt_complete += ret;
2598        }
2599        assert(query_op->amt_complete <= query_op->actual_size);
2600        if (query_op->amt_complete == query_op->actual_size)
2601        {
2602            /* we are done */
2603            op_list_remove(query_op);
2604            *id = 0;
2605            (*actual_size) = query_op->actual_size;
2606            dealloc_tcp_method_op(query_op);
2607            PINT_EVENT_END(
2608                bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid,
2609                0, *actual_size);
2610
2611            return (1);
2612        }
2613        else
2614        {
2615            /* there is still more work to do */
2616            tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
2617            return (0);
2618        }
2619    }
2620
2621    /* NOTE: if the message was in flight, but not buffering, then
2622     * that means that it has already matched an earlier receive
2623     * post or else is an unexpected message that doesn't require a
2624     * matching receive post - at any rate it shouldn't be handled
2625     * here
2626     */
2627
2628    /* if we hit this point we must enqueue */
2629    if (expected_size <= TCP_MODE_EAGER_LIMIT)
2630    {
2631        bogus_header.mode = TCP_MODE_EAGER;
2632    }
2633    else
2634    {
2635        bogus_header.mode = TCP_MODE_REND;
2636    }
2637    bogus_header.tag = tag;
2638    ret = enqueue_operation(op_list_array[IND_RECV],
2639                            BMI_RECV, src, buffer_list, size_list,
2640                            list_count, 0, 0, id, BMI_TCP_INPROGRESS,
2641                            bogus_header, user_ptr, 0,
2642                            expected_size, context_id, eid);
2643    /* just for safety; this field isn't valid to the caller anymore */
2644    (*actual_size) = 0;
2645    /* TODO: figure out why this causes deadlocks; observable in 2
2646     * scenarios:
2647     * - pvfs2-client-core with threaded library and nptl
2648     * - pvfs2-server threaded with nptl sending messages to itself
2649     */
2650#if 0
2651    if (ret >= 0)
2652    {
2653        /* go ahead and try to do some work while we are in this
2654         * function since we appear to be backlogged.  Make sure that
2655         * we do not wait in the poll, however.
2656         */
2657        ret = tcp_do_work(0);
2658    }
2659#endif
2660    return (ret);
2661}
2662
2663
2664/* tcp_cleanse_addr()
2665 *
2666 * finds all active operations matching the given address, places them
2667 * in an error state, and moves them to the completed queue.
2668 *
2669 * NOTE: this function does not shut down the address.  That should be
2670 * handled separately
2671 *
2672 * returns 0 on success, -errno on failure
2673 */
2674static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code)
2675{
2676    int i = 0;
2677    struct op_list_search_key key;
2678    method_op_p query_op = NULL;
2679
2680    memset(&key, 0, sizeof(struct op_list_search_key));
2681    key.method_addr = map;
2682    key.method_addr_yes = 1;
2683
2684    /* NOTE: we know the unexpected completed queue is the last index! */
2685    for (i = 0; i < (NUM_INDICES - 1); i++)
2686    {
2687        if (op_list_array[i])
2688        {
2689            while ((query_op = op_list_search(op_list_array[i], &key)))
2690            {
2691                op_list_remove(query_op);
2692                query_op->error_code = error_code;
2693                if (query_op->mode == TCP_MODE_UNEXP && query_op->send_recv
2694                    == BMI_RECV)
2695                {
2696                    op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP],
2697                                query_op);
2698                }
2699                else
2700                {
2701                    ((struct tcp_op*)(query_op->method_data))->tcp_op_state =
2702                        BMI_TCP_COMPLETE;
2703                    op_list_add(completion_array[query_op->context_id], query_op);
2704                }
2705            }
2706        }
2707    }
2708
2709    return (0);
2710}
2711
2712
2713/* tcp_shutdown_addr()
2714 *
2715 * closes connections associated with a tcp method address
2716 *
2717 * returns 0 on success, -errno on failure
2718 */
2719static int tcp_shutdown_addr(bmi_method_addr_p map)
2720{
2721
2722    struct tcp_addr *tcp_addr_data = map->method_data;
2723    if (tcp_addr_data->socket > -1)
2724    {
2725        close(tcp_addr_data->socket);
2726    }
2727    tcp_addr_data->socket = -1;
2728    tcp_addr_data->not_connected = 1;
2729
2730    return (0);
2731}
2732
2733
2734/* tcp_do_work()
2735 *
2736 * this is the function that actually does communication work during
2737 * BMI_tcp_testXXX and BMI_tcp_waitXXX functions.  The amount of work
2738 * that it does is tunable.
2739 *
2740 * returns 0 on success, -errno on failure.
2741 */
2742static int tcp_do_work(int max_idle_time)
2743{
2744    int ret = -1;
2745    bmi_method_addr_p addr_array[TCP_WORK_METRIC];
2746    int status_array[TCP_WORK_METRIC];
2747    int socket_count = 0;
2748    int i = 0;
2749    int stall_flag = 0;
2750    int busy_flag = 1;
2751    struct timespec req;
2752    struct tcp_addr* tcp_addr_data = NULL;
2753    struct timespec wait_time;
2754    struct timeval start;
2755
2756    if(sc_test_busy)
2757    {
2758        /* another thread is already polling or working on sockets */
2759        if(max_idle_time == 0)
2760        {
2761            /* we don't want to spend time waiting on it; return
2762             * immediately.
2763             */
2764            return(0);
2765        }
2766
2767        /* Sleep until working thread thread signals that it has finished
2768         * its work and then return.  No need for this thread to poll;
2769         * the other thread may have already finished what we wanted.
2770         * This condition wait is used strictly as a best effort to
2771         * prevent busy spin.  We'll sort out the results later.
2772         */
2773        gettimeofday(&start, NULL);
2774        wait_time.tv_sec = start.tv_sec + max_idle_time / 1000;
2775        wait_time.tv_nsec = (start.tv_usec + ((max_idle_time % 1000)*1000))*1000;
2776        if (wait_time.tv_nsec > 1000000000)
2777        {
2778            wait_time.tv_nsec = wait_time.tv_nsec - 1000000000;
2779            wait_time.tv_sec++;
2780        }
2781        gen_cond_timedwait(&interface_cond, &interface_mutex, &wait_time);
2782        return(0);
2783    }
2784
2785    /* this thread has gained control of the polling.  */
2786    sc_test_busy = 1;
2787    gen_mutex_unlock(&interface_mutex);
2788
2789    /* our turn to look at the socket collection */
2790    ret = BMI_socket_collection_testglobal(tcp_socket_collection_p,
2791                                       TCP_WORK_METRIC, &socket_count,
2792                                       addr_array, status_array,
2793                                       max_idle_time);
2794
2795    gen_mutex_lock(&interface_mutex);
2796    sc_test_busy = 0;
2797
2798    if (ret < 0)
2799    {
2800        /* wake up anyone else who might have been waiting */
2801        gen_cond_broadcast(&interface_cond);
2802        PVFS_perror_gossip("Error: socket collection:", ret);
2803        /* BMI_socket_collection_testglobal() returns BMI error code */
2804        return (ret);
2805    }
2806
2807    if(socket_count == 0)
2808        busy_flag = 0;
2809
2810    /* do different kinds of work depending on results */
2811    for (i = 0; i < socket_count; i++)
2812    {
2813        tcp_addr_data = addr_array[i]->method_data;
2814        /* skip working on addresses in failure mode */
2815        if(tcp_addr_data->addr_error)
2816        {
2817            /* addr_error field is in BMI error code format */
2818            tcp_forget_addr(addr_array[i], 0, tcp_addr_data->addr_error);
2819            continue;
2820        }
2821
2822        if (status_array[i] & SC_ERROR_BIT)
2823        {
2824            ret = tcp_do_work_error(addr_array[i]);
2825            if (ret < 0)
2826            {
2827                PVFS_perror_gossip("Warning: BMI error handling failure, continuing", ret);
2828            }
2829        }
2830        else
2831        {
2832            if (status_array[i] & SC_WRITE_BIT)
2833            {
2834                ret = tcp_do_work_send(addr_array[i], &stall_flag);
2835                if (ret < 0)
2836                {
2837                    PVFS_perror_gossip("Warning: BMI send error, continuing", ret);
2838                }
2839                if(!stall_flag)
2840                    busy_flag = 0;
2841            }
2842            if (status_array[i] & SC_READ_BIT)
2843            {
2844                ret = tcp_do_work_recv(addr_array[i], &stall_flag);
2845                if (ret < 0)
2846                {
2847                    PVFS_perror_gossip("Warning: BMI recv error, continuing", ret);
2848                }
2849                if(!stall_flag)
2850                    busy_flag = 0;
2851            }
2852        }
2853    }
2854
2855    /* IMPORTANT NOTE: if we have set the following flag, then it indicates that
2856     * poll() is finding data on our sockets, yet we are not able to move
2857     * any of it right now.  This means that the sockets are backlogged, and
2858     * BMI is in danger of busy spinning during test functions.  Let's sleep
2859     * for a millisecond here in hopes of letting the rest of the system
2860     * catch up somehow (either by clearing a backlog in another I/O
2861     * component, or by posting more matching BMI recieve operations)
2862     */
2863    if(busy_flag)
2864    {
2865        req.tv_sec = 0;
2866        req.tv_nsec = 1000;
2867        gen_mutex_unlock(&interface_mutex);
2868        nanosleep(&req, NULL);
2869        gen_mutex_lock(&interface_mutex);
2870    }
2871
2872    /* wake up anyone else who might have been waiting */
2873    gen_cond_broadcast(&interface_cond);
2874    return (0);
2875}
2876
2877
2878/* tcp_do_work_send()
2879 *
2880 * does work on a TCP address that is ready to send data.
2881 *
2882 * returns 0 on success, -errno on failure
2883 */
2884static int tcp_do_work_send(bmi_method_addr_p map, int* stall_flag)
2885{
2886    method_op_p active_method_op = NULL;
2887    struct op_list_search_key key;
2888    int blocked_flag = 0;
2889    int ret = 0;
2890    int tmp_stall_flag;
2891
2892    *stall_flag = 1;
2893
2894    while (blocked_flag == 0 && ret == 0)
2895    {
2896        /* what we want to do here is find the first operation in the send
2897         * queue for this address.
2898         */
2899        memset(&key, 0, sizeof(struct op_list_search_key));
2900        key.method_addr = map;
2901        key.method_addr_yes = 1;
2902        active_method_op = op_list_search(op_list_array[IND_SEND], &key);
2903        if (!active_method_op)
2904        {
2905            /* ran out of queued sends to work on */
2906            return (0);
2907        }
2908
2909        ret = work_on_send_op(active_method_op, &blocked_flag, &tmp_stall_flag);
2910        if(!tmp_stall_flag)
2911            *stall_flag = 0;
2912    }
2913
2914    return (ret);
2915}
2916
2917
2918/* handle_new_connection()
2919 *
2920 * this function should be called only on special tcp method addresses
2921 * that represent local server ports.  It will attempt to accept a new
2922 * connection and create a new method address for the remote host.
2923 *
2924 * side effect: destroys the temporary method_address that is passed in
2925 * to it.
2926 *
2927 * returns 0 on success, -errno on failure
2928 */
2929static int handle_new_connection(bmi_method_addr_p map)
2930{
2931    struct tcp_addr *tcp_addr_data = NULL;
2932    int accepted_socket = -1;
2933    bmi_method_addr_p new_addr = NULL;
2934    int ret = -1;
2935    char* tmp_peer = NULL;
2936
2937    ret = tcp_accept_init(&accepted_socket, &tmp_peer);
2938    if (ret < 0)
2939    {
2940        return (ret);
2941    }
2942    if (accepted_socket < 0)
2943    {
2944        /* guess it wasn't ready after all */
2945        return (0);
2946    }
2947
2948    /* ok, we have a new socket.  what now?  Probably simplest
2949     * thing to do is to create a new method_addr, add it to the
2950     * socket collection, and return.  It will get caught the next
2951     * time around */
2952    new_addr = alloc_tcp_method_addr();
2953    if (!new_addr)
2954    {
2955        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
2956    }
2957    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
2958                  "Assigning socket %d to new method addr.\n",
2959                  accepted_socket);
2960    tcp_addr_data = new_addr->method_data;
2961    tcp_addr_data->socket = accepted_socket;
2962    tcp_addr_data->peer = tmp_peer;
2963    tcp_addr_data->peer_type = BMI_TCP_PEER_IP;
2964
2965    /* set a flag to make sure that we never try to reconnect this address
2966     * in the future
2967     */
2968    tcp_addr_data->dont_reconnect = 1;
2969    /* register this address with the method control layer */
2970    tcp_addr_data->bmi_addr = bmi_method_addr_reg_callback(new_addr);
2971    if (ret < 0)
2972    {
2973        tcp_shutdown_addr(new_addr);
2974        dealloc_tcp_method_addr(new_addr);
2975        dealloc_tcp_method_addr(map);
2976        return (ret);
2977    }
2978    BMI_socket_collection_add(tcp_socket_collection_p, new_addr);
2979
2980    dealloc_tcp_method_addr(map);
2981    return (0);
2982
2983}
2984
2985
2986/* tcp_do_work_recv()
2987 *
2988 * does work on a TCP address that is ready to recv data.
2989 *
2990 * returns 0 on success, -errno on failure
2991 */
2992static int tcp_do_work_recv(bmi_method_addr_p map, int* stall_flag)
2993{
2994
2995    method_op_p active_method_op = NULL;
2996    int ret = -1;
2997    void *new_buffer = NULL;
2998    struct op_list_search_key key;
2999    struct tcp_msg_header new_header;
3000    struct tcp_addr *tcp_addr_data = map->method_data;
3001    struct tcp_op *tcp_op_data = NULL;
3002    int tmp_errno;
3003    int tmp;
3004    bmi_size_t old_amt_complete = 0;
3005    time_t current_time;
3006
3007    *stall_flag = 1;
3008
3009    /* figure out if this is a new connection */
3010    if (tcp_addr_data->server_port)
3011    {
3012        /* just try to accept connection- no work yet */
3013        *stall_flag = 0;
3014        return (handle_new_connection(map));
3015    }
3016
3017    /* look for a recv for this address that is already in flight */
3018    active_method_op = find_recv_inflight(map);
3019    /* see if we found one in progress... */
3020    if (active_method_op)
3021    {
3022        tcp_op_data = active_method_op->method_data;
3023        if (active_method_op->mode == TCP_MODE_REND &&
3024            tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
3025        {
3026            /* we must wait for recv post */
3027            return (0);
3028        }
3029        else
3030        {
3031            old_amt_complete = active_method_op->amt_complete;
3032            ret = work_on_recv_op(active_method_op, stall_flag);
3033            gossip_debug(GOSSIP_BMI_DEBUG_TCP, "actual_size=%d, "
3034                         "amt_complete=%d, old_amt_complete=%d\n",
3035                         (int)active_method_op->actual_size,
3036                         (int)active_method_op->amt_complete,
3037                         (int)old_amt_complete);
3038
3039            if ((ret == 0) &&
3040                (old_amt_complete == active_method_op->amt_complete) &&
3041                active_method_op->actual_size &&
3042                (active_method_op->amt_complete <
3043                 active_method_op->actual_size))
3044            {
3045                gossip_debug(
3046                    GOSSIP_BMI_DEBUG_TCP, "Warning: bmi_tcp unable "
3047                    "to recv any data reported by poll(). [1]\n");
3048
3049                if (tcp_addr_data->zero_read_limit++ ==
3050                    BMI_TCP_ZERO_READ_LIMIT)
3051                {
3052                    gossip_debug(GOSSIP_BMI_DEBUG_TCP,
3053                                 "...dropping connection.\n");
3054                    tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3055                }
3056            }
3057            else
3058            {
3059                tcp_addr_data->zero_read_limit = 0;
3060            }
3061            return(ret);
3062        }
3063    }
3064
3065    /* let's see if a the entire header is ready to be received.  If so
3066     * we will go ahead and pull it.  Otherwise, we will try again later.
3067     * It isn't worth the complication of reading only a partial message
3068     * header - we really want it atomically
3069     */
3070    ret = BMI_sockio_nbpeek(tcp_addr_data->socket,
3071                            new_header.enc_hdr, TCP_ENC_HDR_SIZE);
3072    if (ret < 0)
3073    {
3074        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-errno));
3075        return (0);
3076    }
3077
3078    if (ret == 0)
3079    {
3080        gossip_debug(
3081            GOSSIP_BMI_DEBUG_TCP, "Warning: bmi_tcp unable "
3082            "to recv any data reported by poll(). [2]\n");
3083
3084        if (tcp_addr_data->zero_read_limit++ ==
3085            BMI_TCP_ZERO_READ_LIMIT)
3086        {
3087            gossip_debug(GOSSIP_BMI_DEBUG_TCP,
3088                         "...dropping connection.\n");
3089            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3090        }
3091        return(0);
3092    }
3093    else
3094    {
3095        tcp_addr_data->zero_read_limit = 0;
3096    }
3097
3098    if (ret < TCP_ENC_HDR_SIZE)
3099    {
3100        current_time = time(NULL);
3101        if(!tcp_addr_data->short_header_timer)
3102        {
3103            tcp_addr_data->short_header_timer = current_time;
3104        }
3105        else if((current_time - tcp_addr_data->short_header_timer) >
3106            BMI_TCP_HEADER_WAIT_SECONDS)
3107        {
3108            gossip_err("Error: incomplete BMI TCP header after %d seconds, closing connection.\n",
3109                BMI_TCP_HEADER_WAIT_SECONDS);
3110            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
3111            return (0);
3112        }
3113
3114        /* header not ready yet, but we will keep hoping */
3115        return (0);
3116    }
3117
3118    tcp_addr_data->short_header_timer = 0;
3119    *stall_flag = 0;
3120    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Reading header for new op.\n");
3121    ret = BMI_sockio_nbrecv(tcp_addr_data->socket,
3122                           new_header.enc_hdr, TCP_ENC_HDR_SIZE);
3123    if (ret < TCP_ENC_HDR_SIZE)
3124    {
3125        tmp_errno = errno;
3126        gossip_err("Error: BMI_sockio_nbrecv: %s\n", strerror(tmp_errno));
3127        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-tmp_errno));
3128        return (0);
3129    }
3130
3131    /* decode the header */
3132    BMI_TCP_DEC_HDR(new_header);
3133
3134    /* so we have the header. now what?  These are the possible
3135     * scenarios:
3136     * a) unexpected message
3137     * b) eager message for which a recv has been posted
3138     * c) eager message for which a recv has not been posted
3139     * d) rendezvous messsage for which a recv has been posted
3140     * e) rendezvous messsage for which a recv has not been posted
3141     * f) eager message for which a rend. recv has been posted
3142     */
3143
3144    /* check magic number of message */
3145    if(new_header.magic_nr != BMI_MAGIC_NR)
3146    {
3147        gossip_err("Error: bad magic in BMI TCP message.\n");
3148        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EBADMSG));
3149        return(0);
3150    }
3151
3152    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Received new message; mode: %d.\n",
3153                  (int) new_header.mode);
3154    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "tag: %d\n", (int) new_header.tag);
3155
3156    if (new_header.mode == TCP_MODE_UNEXP)
3157    {
3158        /* allocate the operation structure */
3159        active_method_op = alloc_tcp_method_op();
3160        if (!active_method_op)
3161        {
3162            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3163            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3164        }
3165        /* create data buffer */
3166        new_buffer = malloc(new_header.size);
3167        if (!new_buffer)
3168        {
3169            dealloc_tcp_method_op(active_method_op);
3170            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3171            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3172        }
3173
3174        /* set the fields */
3175        active_method_op->send_recv = BMI_RECV;
3176        active_method_op->addr = map;
3177        active_method_op->actual_size = new_header.size;
3178        active_method_op->expected_size = 0;
3179        active_method_op->amt_complete = 0;
3180        active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3181        active_method_op->msg_tag = new_header.tag;
3182        active_method_op->buffer = new_buffer;
3183        active_method_op->mode = TCP_MODE_UNEXP;
3184        active_method_op->buffer_list = &(active_method_op->buffer);
3185        active_method_op->size_list = &(active_method_op->actual_size);
3186        active_method_op->list_count = 1;
3187        tcp_op_data = active_method_op->method_data;
3188        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3189        tcp_op_data->env = new_header;
3190
3191        op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3192        /* grab some data if we can */
3193        return (work_on_recv_op(active_method_op, &tmp));
3194    }
3195
3196    memset(&key, 0, sizeof(struct op_list_search_key));
3197    key.method_addr = map;
3198    key.method_addr_yes = 1;
3199    key.msg_tag = new_header.tag;
3200    key.msg_tag_yes = 1;
3201
3202    /* look for a match within the posted operations */
3203    active_method_op = op_list_search(op_list_array[IND_RECV], &key);
3204
3205    if (active_method_op)
3206    {
3207        /* make sure it isn't too big */
3208        if (new_header.size > active_method_op->expected_size)
3209        {
3210            gossip_err("Error: message ordering violation;\n");
3211            gossip_err("Error: message too large for next buffer.\n");
3212            gossip_err("Error: incoming size: %ld, expected size: %ld\n",
3213                        (long) new_header.size,
3214                        (long) active_method_op->expected_size);
3215            /* TODO: return error here or do something else? */
3216            return (bmi_tcp_errno_to_pvfs(-EPROTO));
3217        }
3218
3219        /* we found a match.  go work on it and return */
3220        op_list_remove(active_method_op);
3221        active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3222        active_method_op->actual_size = new_header.size;
3223        op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3224        return (work_on_recv_op(active_method_op, &tmp));
3225    }
3226
3227    /* no match anywhere.  Start a new operation */
3228    /* allocate the operation structure */
3229    active_method_op = alloc_tcp_method_op();
3230    if (!active_method_op)
3231    {
3232        tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3233        return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3234    }
3235
3236    if (new_header.mode == TCP_MODE_EAGER)
3237    {
3238        /* create data buffer for eager messages */
3239        new_buffer = malloc(new_header.size);
3240        if (!new_buffer)
3241        {
3242            dealloc_tcp_method_op(active_method_op);
3243            tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
3244            return (bmi_tcp_errno_to_pvfs(-ENOMEM));
3245        }
3246    }
3247    else
3248    {
3249        new_buffer = NULL;
3250    }
3251
3252    /* set the fields */
3253    active_method_op->send_recv = BMI_RECV;
3254    active_method_op->addr = map;
3255    active_method_op->actual_size = new_header.size;
3256    active_method_op->expected_size = 0;
3257    active_method_op->amt_complete = 0;
3258    active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
3259    active_method_op->msg_tag = new_header.tag;
3260    active_method_op->buffer = new_buffer;
3261    active_method_op->mode = new_header.mode;
3262    active_method_op->buffer_list = &(active_method_op->buffer);
3263    active_method_op->size_list = &(active_method_op->actual_size);
3264    active_method_op->list_count = 1;
3265    tcp_op_data = active_method_op->method_data;
3266    tcp_op_data->tcp_op_state = BMI_TCP_BUFFERING;
3267    tcp_op_data->env = new_header;
3268
3269    op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
3270
3271    /* grab some data if we can */
3272    if (new_header.mode == TCP_MODE_EAGER)
3273    {
3274        return (work_on_recv_op(active_method_op, &tmp));
3275    }
3276
3277    return (0);
3278}
3279
3280
3281/*
3282 * work_on_send_op()
3283 *
3284 * used to perform work on a send operation.  this is called by the poll
3285 * function.
3286 *
3287 * sets blocked_flag if no more work can be done on socket without
3288 * blocking
3289 * returns 0 on success, -errno on failure.
3290 */
3291static int work_on_send_op(method_op_p my_method_op,
3292                           int *blocked_flag, int* stall_flag)
3293{
3294    int ret = -1;
3295    struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
3296    struct tcp_op *tcp_op_data = my_method_op->method_data;
3297
3298    *blocked_flag = 1;
3299    *stall_flag = 0;
3300
3301    /* make sure that the connection is done before we continue */
3302    if (tcp_addr_data->not_connected)
3303    {
3304        ret = tcp_sock_init(my_method_op->addr);
3305        if (ret < 0)
3306        {
3307            PVFS_perror_gossip("Error: socket failed to init", ret);
3308            /* tcp_sock_init() returns BMI error code */
3309            tcp_forget_addr(my_method_op->addr, 0, ret);
3310            return (0);
3311        }
3312        if (tcp_addr_data->not_connected)
3313        {
3314            /* try again later- still could not connect */
3315            tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3316            return (0);
3317        }
3318    }
3319
3320    ret = payload_progress(tcp_addr_data->socket,
3321        my_method_op->buffer_list,
3322        my_method_op->size_list,
3323        my_method_op->list_count,
3324        my_method_op->actual_size,
3325        &(my_method_op->list_index),
3326        &(my_method_op->cur_index_complete),
3327        BMI_SEND,
3328        tcp_op_data->env.enc_hdr,
3329        &my_method_op->env_amt_complete);
3330    if (ret < 0)
3331    {
3332        PVFS_perror_gossip("Error: payload_progress", ret);
3333        /* payload_progress() returns BMI error codes */
3334        tcp_forget_addr(my_method_op->addr, 0, ret);
3335        return (0);
3336    }
3337
3338    if(ret == 0)
3339        *stall_flag = 1;
3340
3341    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
3342    my_method_op->amt_complete += ret;
3343    assert(my_method_op->amt_complete <= my_method_op->actual_size);
3344
3345    if (my_method_op->amt_complete == my_method_op->actual_size && my_method_op->env_amt_complete == TCP_ENC_HDR_SIZE)
3346    {
3347        /* we are done */
3348        my_method_op->error_code = 0;
3349        BMI_socket_collection_remove_write_bit(tcp_socket_collection_p,
3350                                           my_method_op->addr);
3351        op_list_remove(my_method_op);
3352        ((struct tcp_op*)(my_method_op->method_data))->tcp_op_state =
3353            BMI_TCP_COMPLETE;
3354        op_list_add(completion_array[my_method_op->context_id], my_method_op);
3355        *blocked_flag = 0;
3356    }
3357    else
3358    {
3359        /* there is still more work to do */
3360        tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
3361    }
3362
3363    return (0);
3364}
3365
3366
3367/*
3368 * work_on_recv_op()
3369 *
3370 * used to perform work on a recv operation.  this is called by the poll
3371 * function.
3372 * NOTE: this function assumes the method header has already been read.
3373 *
3374 * returns 0 on success, -errno on failure.
3375 */
3376static int work_on_recv_op(method_op_p my_method_op, int* stall_flag)
3377{
3378
3379    int ret = -1;
3380    struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
3381    struct tcp_op *tcp_op_data = my_method_op->method_data;
3382
3383    *stall_flag = 1;
3384
3385    if (my_method_op->actual_size != 0)
3386    {
3387        /* now let's try to recv some actual data */
3388        ret = payload_progress(tcp_addr_data->socket,
3389            my_method_op->buffer_list,
3390            my_method_op->size_list,
3391            my_method_op->list_count,
3392            my_method_op->actual_size,
3393            &(my_method_op->list_index),
3394            &(my_method_op->cur_index_complete),
3395            BMI_RECV,
3396            NULL,
3397            0);
3398        if (ret < 0)
3399        {
3400            PVFS_perror_gossip("Error: payload_progress", ret);
3401            /* payload_progress() returns BMI error codes */
3402            tcp_forget_addr(my_method_op->addr, 0, ret);
3403            return (0);
3404        }
3405    }
3406    else
3407    {
3408        ret = 0;
3409    }
3410
3411    if(ret > 0)
3412        *stall_flag = 0;
3413
3414    my_method_op->amt_complete += ret;
3415    assert(my_method_op->amt_complete <= my_method_op->actual_size);
3416
3417    if (my_method_op->amt_complete == my_method_op->actual_size)
3418    {
3419        /* we are done */
3420        op_list_remove(my_method_op);
3421        if (tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
3422        {
3423            /* queue up to wait on matching post recv */
3424            op_list_add(op_list_array[IND_RECV_EAGER_DONE_BUFFERING],
3425                        my_method_op);
3426        }
3427        else
3428        {
3429            my_method_op->error_code = 0;
3430            if (my_method_op->mode == TCP_MODE_UNEXP)
3431            {
3432                op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP],
3433                            my_method_op);
3434            }
3435            else
3436            {
3437                ((struct tcp_op*)(my_method_op->method_data))->tcp_op_state =
3438                    BMI_TCP_COMPLETE;
3439                op_list_add(completion_array[my_method_op->context_id], my_method_op);
3440            }
3441        }
3442    }
3443
3444    return (0);
3445}
3446
3447
3448/* tcp_do_work_error()
3449 *
3450 * handles a tcp address that has indicated an error during polling.
3451 *
3452 * returns 0 on success, -errno on failure
3453 */
3454static int tcp_do_work_error(bmi_method_addr_p map)
3455{
3456    struct tcp_addr *tcp_addr_data = NULL;
3457    int buf;
3458    int ret;
3459    int tmp_errno;
3460
3461    tcp_addr_data = map->method_data;
3462
3463    /* perform a read on the socket so that we can get a real errno */
3464    ret = read(tcp_addr_data->socket, &buf, sizeof(int));
3465    if (ret == 0)
3466        tmp_errno = EPIPE;  /* report other side closed socket with this */
3467    else
3468        tmp_errno = errno;
3469
3470    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Error: bmi_tcp: %s\n",
3471      strerror(tmp_errno));
3472
3473    if (tcp_addr_data->server_port)
3474    {
3475        /* Ignore this and hope it goes away... we don't want to lose
3476         * our local socket */
3477        dealloc_tcp_method_addr(map);
3478        gossip_lerr("Warning: error polling on server socket, continuing.\n");
3479        return (0);
3480    }
3481
3482    if(tmp_errno == 0)
3483        tmp_errno = EPROTO;
3484
3485    tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-tmp_errno));
3486
3487    return (0);
3488}
3489
3490#if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
3491/*
3492 * tcp_enable_trusted()
3493 * Ideally, this function should look up the security configuration of
3494 * the server and determines
3495 * if it needs to bind to any specific port locally or not..
3496 * For now look at the FIXME below.
3497 */
3498static int tcp_enable_trusted(struct tcp_addr *tcp_addr_data)
3499{
3500    /*
3501     * FIXME:
3502     * For now, there is no way for us to check if a given
3503     * server is actually using port protection or not.
3504     * For now we unconditionally use a trusted port range
3505     * as long as USE_TRUSTED is #defined.
3506     *
3507     * Although most of the time we expect users
3508     * to be using a range of 0-1024, it is hard to keep probing
3509     * until one gets a port in the range specified.
3510     * Hence this is a temporary fix. we will see if this
3511     * requirement even needs to be met at all.
3512     */
3513    static unsigned short my_requested_port = 1023;
3514    unsigned short my_local_port = 0;
3515    struct sockaddr_in my_local_sockaddr;
3516    socklen_t len = sizeof(struct sockaddr_in);
3517    memset(&my_local_sockaddr, 0, sizeof(struct sockaddr_in));
3518
3519    /* setup for a fast restart to avoid bind addr in use errors */
3520    if (BMI_sockio_set_sockopt(tcp_addr_data->socket, SO_REUSEADDR, 1) < 0)
3521    {
3522        gossip_lerr("Could not set SO_REUSEADDR on local socket (port %hd)\n", my_local_port);
3523    }
3524    if (BMI_sockio_bind_sock(tcp_addr_data->socket, my_requested_port) < 0)
3525    {
3526        gossip_lerr("Could not bind to local port %hd: %s\n",
3527                my_requested_port, strerror(errno));
3528    }
3529    else {
3530        my_requested_port--;
3531    }
3532    my_local_sockaddr.sin_family = AF_INET;
3533    if (getsockname(tcp_addr_data->socket,
3534                (struct sockaddr *)&my_local_sockaddr, &len) == 0)
3535    {
3536        my_local_port = ntohs(my_local_sockaddr.sin_port);
3537    }
3538    gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Bound locally to port: %hd\n", my_local_port);
3539    return 0;
3540}
3541
3542#endif
3543
3544#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
3545
3546static char *bad_errors[] = {
3547    "invalid network address",
3548    "invalid port",
3549    "invalid network address and port"
3550};
3551
3552/*
3553 * tcp_allow_trusted()
3554 * if trusted ports was enabled make sure
3555 * that we can accept a particular connection from a given
3556 * client
3557 */
3558static int tcp_allow_trusted(struct sockaddr_in *peer_sockaddr)
3559{
3560    char *peer_hostname = inet_ntoa(peer_sockaddr->sin_addr);
3561    unsigned short peer_port = ntohs(peer_sockaddr->sin_port);
3562    int   i, what_failed   = -1;
3563
3564    /* Don't refuse connects if there were any
3565     * parse errors or if it is not enabled in the config file
3566     */
3567    if (gtcp_allowed_connection->port_enforce == 0
3568            && gtcp_allowed_connection->network_enforce == 0)
3569    {
3570        return 0;
3571    }
3572    /* make sure that the client is within the allowed network */
3573    if (gtcp_allowed_connection->network_enforce == 1)
3574    {
3575        /* Always allow localhost to connect */
3576        if (ntohl(peer_sockaddr->sin_addr.s_addr) == INADDR_LOOPBACK)
3577        {
3578            goto port_check;
3579        }
3580        for (i = 0; i < gtcp_allowed_connection->network_count; i++)
3581        {
3582            /* check with all the masks */
3583            if ((peer_sockaddr->sin_addr.s_addr & gtcp_allowed_connection->netmask[i].s_addr)
3584                    != (gtcp_allowed_connection->network[i].s_addr & gtcp_allowed_connection->netmask[i].s_addr ))
3585            {
3586                continue;
3587            }
3588            else {
3589                goto port_check;
3590            }
3591        }
3592        /* not from a trusted network */
3593        what_failed = 0;
3594    }
3595port_check:
3596    /* make sure that the client port numbers are within specified limits */
3597    if (gtcp_allowed_connection->port_enforce == 1)
3598    {
3599        if (peer_port < gtcp_allowed_connection->ports[0]
3600                || peer_port > gtcp_allowed_connection->ports[1])
3601        {
3602            what_failed = (what_failed < 0) ? 1 : 2;
3603        }
3604    }
3605    /* okay, we are good to go */
3606    if (what_failed < 0)
3607    {
3608        return 0;
3609    }
3610    /* no good */
3611    gossip_err("Rejecting client %s on port %d: %s\n",
3612           peer_hostname, peer_port, bad_errors[what_failed]);
3613    return -1;
3614}
3615
3616#endif
3617
3618/*
3619 * tcp_accept_init()
3620 *
3621 * used to establish a connection from the server side.  Attempts an
3622 * accept call and provides the socket if it succeeds.
3623 *
3624 * returns 0 on success, -errno on failure.
3625 */
3626static int tcp_accept_init(int *socket, char** peer)
3627{
3628
3629    int ret = -1;
3630    int tmp_errno = 0;
3631    struct tcp_addr *tcp_addr_data = tcp_method_params.listen_addr->method_data;
3632    int oldfl = 0;
3633    struct sockaddr_in peer_sockaddr;
3634    int peer_sockaddr_size = sizeof(struct sockaddr_in);
3635    char* tmp_peer;
3636
3637    /* do we have a socket on this end yet? */
3638    if (tcp_addr_data->socket < 0)
3639    {
3640        ret = tcp_server_init();
3641        if (ret < 0)
3642        {
3643            return (ret);
3644        }
3645    }
3646
3647    *socket = accept(tcp_addr_data->socket, (struct sockaddr*)&peer_sockaddr,
3648              (socklen_t *)&peer_sockaddr_size);
3649
3650    if (*socket < 0)
3651    {
3652        if ((errno == EAGAIN) ||
3653            (errno == EWOULDBLOCK) ||
3654            (errno == ENETDOWN) ||
3655            (errno == EPROTO) ||
3656            (errno == ENOPROTOOPT) ||
3657            (errno == EHOSTDOWN) ||
3658            (errno == ENONET) ||
3659            (errno == EHOSTUNREACH) ||
3660            (errno == EOPNOTSUPP) ||
3661            (errno == ENETUNREACH) ||
3662            (errno == ENFILE) ||
3663            (errno == EMFILE))
3664        {
3665            /* try again later */
3666            if ((errno == ENFILE) || (errno == EMFILE))
3667            {
3668                gossip_err("Error: accept: %s (continuing)\n",strerror(errno));
3669                bmi_method_addr_drop_callback(BMI_tcp_method_name);
3670            }
3671            return (0);
3672        }
3673        else
3674        {
3675            gossip_err("Error: accept: %s\n", strerror(errno));
3676            return (bmi_tcp_errno_to_pvfs(-errno));
3677        }
3678    }
3679
3680#if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
3681
3682    /* make sure that we are allowed to accept this connection */
3683    if (tcp_allow_trusted(&peer_sockaddr) < 0)
3684    {
3685        /* Force closure of the connection */
3686        close(*socket);
3687        return (bmi_tcp_errno_to_pvfs(-EACCES));
3688    }
3689
3690#endif
3691
3692    /* we accepted a new connection.  turn off Nagle's algorithm. */
3693    if (BMI_sockio_set_tcpopt(*socket, TCP_NODELAY, 1) < 0)
3694    {
3695        tmp_errno = errno;
3696        gossip_lerr("Error: failed to set TCP_NODELAY option.\n");
3697        close(*socket);
3698        return (bmi_tcp_errno_to_pvfs(-tmp_errno));
3699    }
3700
3701    /* set it to non-blocking operation */
3702    oldfl = fcntl(*socket, F_GETFL, 0);
3703    if (!(oldfl & O_NONBLOCK))
3704    {
3705        fcntl(*socket, F_SETFL, oldfl | O_NONBLOCK);
3706    }
3707
3708    /* allocate ip address string */
3709    tmp_peer = inet_ntoa(peer_sockaddr.sin_addr);
3710    *peer = (char*)malloc(strlen(tmp_peer)+1);
3711    if(!(*peer))
3712    {
3713        close(*socket);
3714        return(bmi_tcp_errno_to_pvfs(-BMI_ENOMEM));
3715    }
3716    strcpy(*peer, tmp_peer);
3717
3718    return (0);
3719}
3720
3721
3722/* alloc_tcp_method_op()
3723 *
3724 * creates a new method op with defaults filled in for tcp.
3725 *
3726 * returns pointer to structure on success, NULL on failure
3727 */
3728static method_op_p alloc_tcp_method_op(void)
3729{
3730    method_op_p my_method_op = NULL;
3731
3732    my_method_op = bmi_alloc_method_op(sizeof(struct tcp_op));
3733
3734    /* we trust alloc_method_op to zero it out */
3735
3736    return (my_method_op);
3737}
3738
3739
3740/* dealloc_tcp_method_op()
3741 *
3742 * destroys an existing tcp method op, freeing segment lists if
3743 * needed
3744 *
3745 * no return value
3746 */
3747static void dealloc_tcp_method_op(method_op_p old_op)
3748{
3749    bmi_dealloc_method_op(old_op);
3750    return;
3751}
3752
3753/* tcp_post_send_generic()
3754 *
3755 * Submits send operations (low level).
3756 *
3757 * returns 0 on success that requires later poll, returns 1 on instant
3758 * completion, -errno on failure
3759 */
3760static int tcp_post_send_generic(bmi_op_id_t * id,
3761                                 bmi_method_addr_p dest,
3762                                 const void *const *buffer_list,
3763                                 const bmi_size_t *size_list,
3764                                 int list_count,
3765                                 enum bmi_buffer_type buffer_type,
3766                                 struct tcp_msg_header my_header,
3767                                 void *user_ptr,
3768                                 bmi_context_id context_id,
3769                                 PVFS_hint hints)
3770{
3771    struct tcp_addr *tcp_addr_data = dest->method_data;
3772    method_op_p query_op = NULL;
3773    int ret = -1;
3774    bmi_size_t total_size = 0;
3775    bmi_size_t amt_complete = 0;
3776    bmi_size_t env_amt_complete = 0;
3777    struct op_list_search_key key;
3778    int list_index = 0;
3779    bmi_size_t cur_index_complete = 0;
3780    PINT_event_id eid = 0;
3781
3782    if(PINT_EVENT_ENABLED)
3783    {
3784        int i = 0;
3785        for(; i < list_count; ++i)
3786        {
3787            total_size += size_list[i];
3788        }
3789    }
3790
3791    PINT_EVENT_START(
3792        bmi_tcp_send_event_id, bmi_tcp_pid, NULL, &eid,
3793        PINT_HINT_GET_CLIENT_ID(hints),
3794        PINT_HINT_GET_REQUEST_ID(hints),
3795        PINT_HINT_GET_RANK(hints),
3796        PINT_HINT_GET_HANDLE(hints),
3797        PINT_HINT_GET_OP_ID(hints),
3798        total_size);
3799
3800    /* Three things can happen here:
3801     * a) another op is already in queue for the address, so we just
3802     * queue up
3803     * b) we can send the whole message and return
3804     * c) we send part of the message and queue the rest
3805     */
3806
3807    /* NOTE: on the post_send side of an operation, it doesn't really
3808     * matter whether the op is going to be eager or rendezvous.  It is
3809     * handled the same way (except for how the header is filled in).
3810     * The difference is in the recv processing for TCP.
3811     */
3812
3813    /* NOTE: we also don't care what the buffer_type says, TCP could care
3814     * less what buffers it is using.
3815     */
3816
3817    /* encode the message header */
3818    BMI_TCP_ENC_HDR(my_header);
3819
3820    /* the first thing we must do is find out if another send is queued
3821     * up for this address so that we don't mess up our ordering.    */
3822    memset(&key, 0, sizeof(struct op_list_search_key));
3823    key.method_addr = dest;
3824    key.method_addr_yes = 1;
3825    query_op = op_list_search(op_list_array[IND_SEND], &key);
3826    if (query_op)
3827    {
3828        /* queue up operation */
3829        ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3830                                dest, (void **) buffer_list,
3831                                size_list, list_count, 0, 0,
3832                                id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3833                                my_header.size, 0,
3834                                context_id,
3835                                eid);
3836
3837        /* TODO: is this causing deadlocks?  See similar call in recv
3838         * path for another example.  This particular one seems to be an
3839         * issue under a heavy bonnie++ load that Neill has been
3840         * debugging.  Comment out for now to see if the problem goes
3841         * away.
3842         */
3843#if 0
3844        if (ret >= 0)
3845        {
3846            /* go ahead and try to do some work while we are in this
3847             * function since we appear to be backlogged.  Make sure that
3848             * we do not wait in the poll, however.
3849             */
3850            ret = tcp_do_work(0);
3851        }
3852#endif
3853        if (ret < 0)
3854        {
3855            gossip_err("Error: enqueue_operation() or tcp_do_work() returned: %d\n", ret);
3856        }
3857        return (ret);
3858    }
3859
3860    /* make sure the connection is established */
3861    ret = tcp_sock_init(dest);
3862    if (ret < 0)
3863    {
3864        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "tcp_sock_init() failure.\n");
3865        /* tcp_sock_init() returns BMI error code */
3866        tcp_forget_addr(dest, 0, ret);
3867        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, 0, ret);
3868        return (ret);
3869    }
3870
3871    tcp_addr_data = dest->method_data;
3872
3873#if 0
3874    /* TODO: this is a hack for testing! */
3875    /* disables immediate send completion... */
3876    ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3877                            dest, buffer_list, size_list, list_count, 0, 0,
3878                            id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3879                            my_header.size, 0,
3880                            context_id);
3881    return(ret);
3882#endif
3883
3884    if (tcp_addr_data->not_connected)
3885    {
3886        /* if the connection is not completed, queue up for later work */
3887        ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3888                                dest, (void **) buffer_list, size_list,
3889                                list_count, 0, 0,
3890                                id, BMI_TCP_INPROGRESS, my_header, user_ptr,
3891                                my_header.size, 0,
3892                                context_id,
3893                                eid);
3894        if(ret < 0)
3895        {
3896            gossip_err("Error: enqueue_operation() returned: %d\n", ret);
3897        }
3898        return (ret);
3899    }
3900
3901    /* try to send some data */
3902    env_amt_complete = 0;
3903    ret = payload_progress(tcp_addr_data->socket,
3904        (void **) buffer_list,
3905        size_list, list_count, my_header.size, &list_index,
3906        &cur_index_complete, BMI_SEND, my_header.enc_hdr, &env_amt_complete);
3907    if (ret < 0)
3908    {
3909        PVFS_perror_gossip("Error: payload_progress", ret);
3910        /* payload_progress() returns BMI error codes */
3911        tcp_forget_addr(dest, 0, ret);
3912        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, eid, 0, ret);
3913        return (ret);
3914    }
3915
3916    gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
3917    amt_complete = ret;
3918    assert(amt_complete <= my_header.size);
3919    if (amt_complete == my_header.size && env_amt_complete == TCP_ENC_HDR_SIZE)
3920    {
3921        /* we are already done */
3922        PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid,
3923                       NULL, eid, 0, amt_complete);
3924        return (1);
3925    }
3926
3927    /* queue up the remainder */
3928    ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
3929                            dest, (void **) buffer_list,
3930                            size_list, list_count,
3931                            amt_complete, env_amt_complete, id,
3932                            BMI_TCP_INPROGRESS, my_header, user_ptr,
3933                            my_header.size, 0, context_id, eid);
3934
3935    if(ret < 0)
3936    {
3937        gossip_err("Error: enqueue_operation() returned: %d\n", ret);
3938    }
3939    return (ret);
3940}
3941
3942
3943/* payload_progress()
3944 *
3945 * makes progress on sending/recving data payload portion of a message
3946 *
3947 * returns amount completed on success, -errno on failure
3948 */
3949static int payload_progress(int s, void *const *buffer_list, const bmi_size_t*
3950    size_list, int list_count, bmi_size_t total_size, int* list_index,
3951    bmi_size_t* current_index_complete, enum bmi_op_type send_recv,
3952    char* enc_hdr, bmi_size_t* env_amt_complete)
3953{
3954    int i;
3955    int count = 0;
3956    int ret;
3957    int completed;
3958    /* used for finding the stopping point on short receives */
3959    int final_index = list_count-1;
3960    bmi_size_t final_size = size_list[list_count-1];
3961    bmi_size_t sum = 0;
3962    int vector_index = 0;
3963    int header_flag = 0;
3964    int tmp_env_done = 0;
3965
3966    if(send_recv == BMI_RECV)
3967    {
3968        /* find out if we should stop short in list processing */
3969        for(i=0; i<list_count; i++)
3970        {
3971            sum += size_list[i];
3972            if(sum >= total_size)
3973            {
3974                final_index = i;
3975                final_size = size_list[i] - (sum-total_size);
3976                break;
3977            }
3978        }
3979    }
3980
3981    assert(list_count > *list_index);
3982
3983    /* make sure we don't overrun our preallocated iovec array */
3984    if((list_count - (*list_index)) > BMI_TCP_IOV_COUNT)
3985    {
3986        list_count = (*list_index) + BMI_TCP_IOV_COUNT;
3987    }
3988
3989    /* do we need to send any of the header? */
3990    if(send_recv == BMI_SEND && *env_amt_complete < TCP_ENC_HDR_SIZE)
3991    {
3992        stat_io_vector[vector_index].iov_base = &enc_hdr[*env_amt_complete];
3993        stat_io_vector[vector_index].iov_len = TCP_ENC_HDR_SIZE - *env_amt_complete;
3994        count++;
3995        vector_index++;
3996        header_flag = 1;
3997    }
3998
3999    /* setup vector */
4000    stat_io_vector[vector_index].iov_base =
4001        (char*)buffer_list[*list_index] + *current_index_complete;
4002    count++;
4003    if(final_index == 0)
4004    {
4005        stat_io_vector[vector_index].iov_len = final_size - *current_index_complete;
4006    }
4007    else
4008    {
4009        stat_io_vector[vector_index].iov_len =
4010            size_list[*list_index] - *current_index_complete;
4011        for(i = (*list_index + 1); i < list_count; i++)
4012        {
4013            vector_index++;
4014            count++;
4015            stat_io_vector[vector_index].iov_base = buffer_list[i];
4016            if(i == final_index)
4017            {
4018                stat_io_vector[vector_index].iov_len = final_size;
4019                break;
4020            }
4021            else
4022            {
4023                stat_io_vector[vector_index].iov_len = size_list[i];
4024            }
4025        }
4026    }
4027
4028    assert(count > 0);
4029
4030    if(send_recv == BMI_RECV)
4031    {
4032        ret = BMI_sockio_nbvector(s, stat_io_vector, count, 1);
4033    }
4034    else
4035    {
4036        ret = BMI_sockio_nbvector(s, stat_io_vector, count, 0);
4037    }
4038
4039    /* if error or nothing done, return now */
4040    if(ret == 0)
4041        return(0);
4042    if(ret <= 0)
4043        return(bmi_tcp_errno_to_pvfs(-errno));
4044
4045    completed = ret;
4046    if(header_flag && (completed >= 0))
4047    {
4048        /* take care of completed header status */
4049        tmp_env_done = TCP_ENC_HDR_SIZE - *env_amt_complete;
4050        if(tmp_env_done > completed)
4051            tmp_env_done = completed;
4052        completed -= tmp_env_done;
4053        ret -= tmp_env_done;
4054        (*env_amt_complete) += tmp_env_done;
4055    }
4056
4057    i=header_flag;
4058    while(completed > 0)
4059    {
4060        /* take care of completed data payload */
4061        if(completed >= stat_io_vector[i].iov_len)
4062        {
4063            completed -= stat_io_vector[i].iov_len;
4064            *current_index_complete = 0;
4065            (*list_index)++;
4066            i++;
4067        }
4068        else
4069        {
4070            *current_index_complete += completed;
4071            completed = 0;
4072        }
4073    }
4074
4075    return(ret);
4076}
4077
4078static void bmi_set_sock_buffers(int socket){
4079        //Set socket buffer sizes:
4080        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Default socket buffers send:%d receive:%d\n",
4081                GET_SENDBUFSIZE(socket), GET_RECVBUFSIZE(socket));
4082        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Setting socket buffer size for send:%d receive:%d \n",
4083                tcp_buffer_size_send, tcp_buffer_size_receive);
4084    if( tcp_buffer_size_receive != 0)
4085         SET_RECVBUFSIZE(socket,tcp_buffer_size_receive);
4086    if( tcp_buffer_size_send != 0)
4087         SET_SENDBUFSIZE(socket,tcp_buffer_size_send);
4088        gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Reread socket buffers send:%d receive:%d\n",
4089                GET_SENDBUFSIZE(socket), GET_RECVBUFSIZE(socket));
4090}
4091
4092/*
4093 * Local variables:
4094 *  c-indent-level: 4
4095 *  c-basic-offset: 4
4096 * End:
4097 *
4098 * vim: ts=8 sts=4 sw=4 expandtab
4099 */
Note: See TracBrowser for help on using the browser.