I keep woking on Java TCP fork.
authorgumartinm <gu.martinm@gmail.com>
Sun, 29 Jan 2012 01:26:47 +0000 (02:26 +0100)
committergumartinm <gu.martinm@gmail.com>
Sun, 29 Jan 2012 01:26:47 +0000 (02:26 +0100)
It is being harder than I thought.

JavaFork/Daemon/javafork.c [changed mode: 0644->0755]
JavaFork/Daemon/javafork.h [changed mode: 0644->0755]
JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java
JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java
JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java

old mode 100644 (file)
new mode 100755 (executable)
index 366a72b..fd1d03a
 #include "javafork.h"
 
 
-pid_t daemonPID;
-int sockfd;
+pid_t daemonPID;    /*Stores the daemon server PID*/
+int sockfd = -1;    /*Stores the daemon server TCP socket.*/
 
-#define RESTARTABLE(_cmd, _result) do { \
-    _result = _cmd; \
-} while((_result == -1) && (errno == EINTR))
 
-static int restartableClose(int fd) {
-    int err;
-    RESTARTABLE(close(fd), err);
-    return err;
+
+static int restartableClose(int fd) 
+{
+    return TEMP_FAILURE_RETRY(close(fd));
 }
 
-static int closeSafely(int fd) {
+
+
+static int closeSafely(int fd) 
+{
+    /*If we always initialize file descriptor variables with value -1*/
+    /*this method should work like a charm*/
     return (fd == -1) ? 0 : restartableClose(fd);
 }
 
 
-int main (int argc, char *argv[]) {
-       int c; /*Getopt parameter*/
+
+int main (int argc, char *argv[]) 
+{
+       int c;                                          /*Getopt parameter*/
        /*Default values*/
-       char *avalue = IPADDRESS; /*Address: numeric value or hostname*/
-       int pvalue = PORT;        /*TCP port*/
-       int qvalue = QUEUE;       /*TCP listen queue*/
+       char *avalue = IPADDRESS;       /*Address: numeric value or hostname*/
+       int pvalue = PORT;              /*TCP port*/
+       int qvalue = QUEUE;                 /*TCP listen queue*/
        
        
        /*This process is intended to be used as a daemon, it sould be launched by the INIT process, because of that*/
        /*we are not forking it (INIT needs it)*/
        if (daemonize(argv[0], LOG_SYSLOG, LOG_PID) < 0)
                return 1;
-       
+
+       /*Changing session.*/   
        setsid();
        
        if (signal(SIGPIPE,SIG_IGN) == SIG_ERR) {
-               syslog (LOG_ERR, "Error with SIGPIPE: %m");
+               syslog (LOG_ERR, "signal SIGPIPE desactivation failed: %m");
                return 1;
        }
        
-       
        opterr = 0;
        while ((c = getopt (argc, argv, "a:p:q:")) != -1) {
                switch (c) {
-                       case 'a':
-                               avalue = optarg;
-                               break;
-                       case 'p':
-                               pvalue = atoi(optarg);
-                               if ((pvalue > 65535) || (pvalue <= 0)) {
-                                       syslog (LOG_ERR, "Port value %d out of range", pvalue);
-                                       return 1;
-                               }
-                               break;
-                       case 'q':
-                               qvalue = atoi(optarg);
-                               break;
-                       case '?':
-                               if ((optopt == 'a') || (optopt == 'p') || (optopt == 'q'))
-                                       syslog (LOG_ERR, "Option -%c requires an argument.", optopt);
-                               else if (isprint (optopt))
-                                       syslog (LOG_ERR, "Invalid option '-%c'.", optopt);
-                               else
-                                       syslog (LOG_ERR, "Unknown option character '\\x%x'.", optopt);
+               case 'a':
+                       avalue = optarg;
+                       break;
+               case 'p':
+                       pvalue = atoi(optarg);
+                       if ((pvalue > 65535) || (pvalue <= 0)) {
+                               syslog (LOG_ERR, "Port value %d out of range", pvalue);
                                return 1;
-                       default:
-                               abort ();
+                       }
+                       break;
+               case 'q':
+                       qvalue = atoi(optarg);
+                       break;
+               case '?':
+                       if ((optopt == 'a') || (optopt == 'p') || (optopt == 'q'))
+                               syslog (LOG_ERR, "Option -%c requires an argument.", optopt);
+                       else if (isprint (optopt))
+                               syslog (LOG_ERR, "Invalid option '-%c'.", optopt);
+                       else
+                               syslog (LOG_ERR, "Unknown option character '\\x%x'.", optopt);
+                       return 1;
+               default:
+                       abort ();
                }
        }
+
        /*This program does not admit options*/
        if (optind < argc) {
                syslog (LOG_ERR,"This program does not admit options just argument elements with their values.");
                return 1;
        }
        
-       
-       /*TODO: INIT process sending SIGTERM? Should I catch that signal?*/
+       /*INIT process sending SIGINT? Should I catch that signal?*/
        daemonPID = getpid();
-       if (signal(SIGTERM, sigterm_handler) == SIG_ERR) {
+       if (signal(SIGINT, sigint_handler) == SIG_ERR) {
                syslog (LOG_ERR, "SIGTERM signal handler failed: %m");
                return 1;
        }
        
-       
-       if (main_child(avalue, pvalue, qvalue) < 0 )
+       if (main_child(avalue, pvalue, qvalue) < 0)
                return 1;
        
        return 0;
 }
 
 
-int main_child (char *address, int port, int queue) {
+
+int main_child (char *address, int port, int queue)
+{
        struct protoent *protocol;                      /*Network protocol*/
-       struct sockaddr_in addr_server; /*struct with the server socket address*/
-       struct sockaddr_in  addr_client;/*struct with the client socket address*/
-       int sockclient;                                                                 /*File descriptor for the accepted socket*/
-       pthread_t idThread;                                                     /*Thread identifier number*/
+       struct sockaddr_in addr_server;     /*struct with the server socket address*/
+       struct sockaddr_in  addr_client;    /*struct with the client socket address*/
+       int sockclient = -1;                            /*File descriptor for the accepted socket*/
+       pthread_t idThread;                                     /*Thread identifier number*/
        socklen_t clilen;
        int optval;
-       int returnValue = 0;                                            /*OK by default*/
+       int returnValue = 0;                            /*The return value from this function, OK by default*/
        
        
        /*Retrieve protocol number from /etc/protocols file */
        protocol=getprotobyname("tcp");
        if (protocol == NULL) {
                syslog(LOG_ERR, "cannot map \"tcp\" to protocol number: %m");
-               goto EndwithError;
+               goto err;
        }
        
        bzero((char*) &addr_server, sizeof(addr_server));
        addr_server.sin_family = AF_INET;
        if (inet_pton(AF_INET, address, &addr_server.sin_addr.s_addr) <= 0) {
                syslog (LOG_ERR, "error inet_pton: %m");
-               goto EndwithError;
+               goto err;
        }
        
        addr_server.sin_port = htons(port);
        
        if ((sockfd = socket(AF_INET, SOCK_STREAM, protocol->p_proto)) < 0) {
                syslog (LOG_ERR, "socket creation failed: %m");
-               goto EndandClosewithError; 
+               goto err; 
        }
 
 
-       /*We want to avoid issues while trying to bind a socket in TIME_WAIT state.*/
-       /*In this application from the client there should not be any trouble if it gets*/
-       /*TIME_WAIT states, because it can connect from any source port.*/
+       /*We want to avoid issues while trying to bind a socket in TIME_WAIT state*/
        optval = 1;
        if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
                syslog (LOG_ERR, "setsockopt failed: %m");
-               goto EndandClosewithError;
+               goto err;
        }
        
        if (bind(sockfd, (struct sockaddr *) &addr_server, sizeof(addr_server)) < 0) {
                syslog (LOG_ERR, "socket bind failed: %m");
-               goto EndandClosewithError;
+               goto err;
        }
        
        if (listen (sockfd, queue) < 0 ) {
                syslog (LOG_ERR, "socket listen failed: %m");
-               goto EndandClosewithError;
+               goto err;
        }       
        
        while(1) {
                clilen =  sizeof(addr_client);
-               if ((sockclient = accept (sockfd, (struct sockaddr *) &addr_client, &clilen)) < 0) {
+               if ((sockclient = TEMP_FAILURE_RETRY(accept (sockfd, (struct sockaddr *) &addr_client, &clilen))) < 0) {
                        syslog (LOG_ERR, "socket accept failed: %m");
-                       goto EndandClosewithError;
+                       goto err;
                }
+
                if (pthread_create (&idThread, NULL, serverThread, (void *)sockclient) != 0 ) {
                        syslog (LOG_ERR, "thread creation failed: %m");
-               }       
+               }
        }
-       
-       
-       EndasUsual:
-               return returnValue;
-       EndandClosewithError:
-               close (sockfd);
-       EndwithError:
-               /*When there is an error.*/
-               returnValue = -1;       
-               goto EndasUsual;
+
+end:
+    closeSafely (sockfd);
+    return returnValue;
+err:
+       /*When there is an error.*/
+       returnValue = -1;       
+       goto end;
 }
 
 
-int daemonize(const char *pname, int facility, int option) {
-       int fd;
+
+int daemonize(const char *pname, int facility, int option)
+{
+       int fd = -1;    /*Temporaly store for the /dev/tty and /dev/null file descriptors*/
        
-       if ( (fd = open( "/dev/tty", O_RDWR, 0) ) == -1) {
+       if ((fd = TEMP_FAILURE_RETRY(open( "/dev/tty", O_RDWR, 0))) == -1) {
                /*We already have no tty control*/
-               close(fd);
+               closeSafely(fd);
                return 0;
        }
        
        /*Sending messages to log*/
        openlog(pname, option, facility);
+
        /*To get a controlling tty*/
        if (ioctl(fd, TIOCNOTTY, (caddr_t)0) <0 ) {
                syslog (LOG_ERR, "Getting tty failed: %m");
                return -1;
        }
-       if (close(fd) < 0) {
+
+       if (closeSafely(fd) < 0) {
                syslog (LOG_ERR, "Closing tty failed: %m");
                return -1;
        }
        
-       if((fd = open( "/dev/null", O_RDWR, 0) ) == -1) {
-               close(fd);
+       if((fd = TEMP_FAILURE_RETRY(open( "/dev/null", O_RDWR, 0))) == -1) {
+               closeSafely(fd);
                return -1;
        }
-       dup2(fd,0);
-       dup2(fd,1);
-       dup2(fd,2);
-       close(fd);
-       
-       return 0;
-}
-
-void *serverThread (void * arg) {
-       int sock;
 
-       /*Variables used for the select function*/
-       struct timeval ptime;
-       fd_set fd_read;
-       int ret;
-
-       /*Control parameters used while receiving data from the client*/
-       int nLeft, nData, iPos;
-       /*This buffer is intended to store the data received from the client.*/
-       char buffer[1025];
+       if (TEMP_FAILURE_RETRY(dup2(fd,0)) < 0 || 
+        TEMP_FAILURE_RETRY(dup2(fd,1)) < 0 ||
+        TEMP_FAILURE_RETRY(dup2(fd,2)) < 0) {
+           closeSafely(fd);
+        return -1;
+    }
 
-       /*The command sent by the client, to be executed by this process*/      
-       char *command;
-       /*Store the command length*/
-       uint32_t *commandLength;
-       /*Store the ack*/
-       uint32_t *ack;
+    closeSafely(fd);   
 
+       return 0;
+}
 
 
-       sock = (int) arg;
 
+void *serverThread (void * arg)
+{
+       int socket = -1;                    /*Open socket by the Java client*/
+       struct timeval ptime;               /*Variable used by the select function, timeout control*/
+       int nLeft;                                          /*Control parameter used while receiving data from the client*/
+       char buffer[1025];                          /*This buffer is intended to store the data received from the client*/
+       char *command = NULL;                   /*The command sent by the client, to be executed by this process*/      
+       uint32_t *commandLength = NULL;     /*Store the command length*/
+       
+       socket = (int) arg;
+       
        pthread_detach(pthread_self());
 
 
-
-       /******************************************************************************************/
-       /*                                      Just over 1 TCP connection                                                                                                                                                                                                              */
-       /*                                      COMMAND_LENGTH: Java integer 4 bytes, BIG-ENDIAN (the same as network order)    */
-       /*                                      ACK: integer 4 bytes big-endian (for Java) with the received comand length              */
-       /*                                      COMMAND: TPV locale character set encoding                                                                                                                                              */
-       /*                                      RESULTS: TPV locale character set encoding                                                                                                                                              */
-       /*                                                                                                                                                                                                                                                                                                                                                              */
-       /*                                                      JAVA CLIENT: ------------ COMMAND_LENGTH -------> :SERVER                                                                       */
-       /*                                                      JAVA CLIENT: <---------------- ACK -------------- :SERVER                                                                       */ 
-       /*                                                      JAVA CLIENT: -------------- COMMAND ------------> :SERVER                                                                       */
-       /*                                                      JAVA CLIENT: <-------------- RESULTS ------------ :SERVER                                                                       */
-       /*                                                      JAVA CLIENT: <---------- CLOSE CONNECTION ------- :SERVER                                                                       */
-       /*                                                                                                                                                                                                                                                                                                                                                              */
-       /******************************************************************************************/
-
-
-       /*Wait 2 seconds for data coming from client.*/
-       ptime.tv_sec=2;
-  ptime.tv_usec=0;
+    if (required_sock_options (socket) < 0)
+        goto err;
+       
+       
+       /************************************************************************************************************/
+       /*                                      Just over 1 TCP connection                                                                                                                      */
+       /*                                      COMMAND_LENGTH: Java integer 4 bytes, BIG-ENDIAN (the same as network order)            */
+       /*                                      COMMAND: locale character set encoding                                                                                  */
+       /*                                      RESULTS: locale character set encoding                                                                                  */
+       /*                                                                                                                                                                                                              */
+       /*                                                      JAVA CLIENT: ------------ COMMAND_LENGTH -------> :SERVER                                       */
+       /*                                                      JAVA CLIENT: -------------- COMMAND ------------> :SERVER                                       */
+       /*                                                      JAVA CLIENT: <-------------- RESULTS ------------ :SERVER                                       */
+       /*                                                      JAVA CLIENT: <---------- CLOSE CONNECTION ------- :SERVER                                       */
+       /*                                                                                                                                                                                                              */
+       /************************************************************************************************************/
 
 
        /*COMMAND LENGTH*/
        /*First of all we receive the command size as a Java integer (4 bytes primitive type)*/ 
        if ((commandLength = (uint32_t *) malloc(sizeof(uint32_t))) == NULL) {
                syslog (LOG_ERR, "commandLength malloc failed: %m");
-               goto Error;
+               goto err;
        }
+
        bzero(buffer, sizeof(buffer));
        nLeft = sizeof(uint32_t);
-       nData = iPos = 0;
-       do {
-               FD_ZERO(&fd_read);
-               FD_SET(sock, &fd_read);
-               ret = select(sock+1, &fd_read,NULL,NULL, &ptime);
-               if(ret == 0) {
-                       syslog(LOG_INFO, "receiving command length timeout error");
-                       goto Error;
-               }
-               if(ret == -1) {
-        syslog(LOG_ERR, "receiving command length select error: %m");
-                               goto Error;
-    }
-               if((nData = recv(sock, &buffer[iPos], nLeft, 0)) <= 0) {
-                       syslog (LOG_ERR, "command length reception failed: %m");
-                       goto Error;
-               }
-               nLeft -= nData;
-               iPos += nData;
-       } while (nLeft > 0);
-       if (nLeft < 0) {
+       /*Wait max 2 seconds for data coming from client, otherwise exits with error.*/
+       ptime.tv_sec=2;
+       ptime.tv_usec=0;
+    if (receive_from_socket (socket, buffer, &nLeft, &ptime) < 0)
+        goto err;
+
+    if (nLeft < 0)
                syslog (LOG_INFO, "command length excessive data received, expected: 4, received: %d", (-nLeft + sizeof(uint32_t)) );
-       }
 
        /*Retrieve integer (4 bytes) from buffer*/
        memcpy (commandLength, buffer, sizeof(uint32_t));
@@ -308,199 +290,314 @@ void *serverThread (void * arg) {
        *commandLength = be32toh (*commandLength);
 
 
-       
-       /*ACK*/
-       /*Sending back the command length as the ACK message*/
-       if ((ack = (uint32_t *) malloc(sizeof(uint32_t))) == NULL) {
-               syslog (LOG_ERR, "commandLength malloc failed: %m");
-               goto ErrorAck;
-  }
-       *ack = htobe32(*commandLength);
-       if (send(sock, ack, sizeof(uint32_t), 0) < 0) {
-               syslog (LOG_ERR, "send ACK failed: %m");
-               goto ErrorAck;
-       }
-
-
                
        /*COMMAND*/
        /*Reserving commandLength + 1 because of the string end character*/
        if ((command = (char *) malloc(*commandLength + 1)) == NULL) {
                syslog (LOG_ERR, "command malloc failed: %m");
-               goto ErrorCommand;
+               goto err;
        } 
+
        bzero(command, ((*commandLength) + 1));
-  nLeft = *commandLength;
-  nData = iPos = 0;
-  do {
-    FD_ZERO(&fd_read);
-    FD_SET(sock, &fd_read);
-    ret = select(sock+1, &fd_read,NULL,NULL, &ptime);
-    if(ret == 0) {
-      syslog(LOG_INFO, "receiving command timeout error");
-      goto ErrorCommand;
-    }
-    if(ret == -1) {
-        syslog(LOG_ERR, "receiving command select error: %m");
-        goto ErrorCommand;
-    }
-    if((nData = recv(sock, &command[iPos], nLeft, 0)) <= 0) {
-       syslog (LOG_ERR, "command reception failed: %m");
-       goto ErrorCommand;
-    }
-    nLeft -= nData;
-    iPos += nData;
-  } while (nLeft > 0);
-  if (nLeft < 0) {
-    syslog (LOG_INFO, "execessive command length, expected: %d, received: %d", *commandLength, (-nLeft + *commandLength));
-  }
+    nLeft = *commandLength;
+       /*Wait max 2 seconds for data coming from client, otherwise exits with error.*/
+       ptime.tv_sec=2;
+       ptime.tv_usec=0;
+       if (receive_from_socket (socket, command, &nLeft, &ptime) < 0)
+        goto err;
+
+       if (nLeft < 0)
+        syslog (LOG_INFO, "execessive command length, expected: %d, received: %d", *commandLength, (-nLeft + *commandLength));
 
 
        /*RESULTS*/     
-       fork_system(sock, command);     
+       pre_fork_system(socket, command);       
 
 
        /*CLOSE CONNECTION AND FINISH*/
 
-
-ErrorCommand:
+err:
        free(command);
-ErrorAck:
-       free(ack);
-Error: 
-  close(sock);
+    closeSafely(socket);
        free(commandLength);
 
        pthread_exit(0);
 }
 
+int required_sock_options (int socket)
+{
+    int optval, flags;
+
+    /*We want non blocking sockets.*/
+    /*See the discussion of spurious readiness notifications under the BUGS section of select(2) */
+    if ((flags = TEMP_FAILURE_RETRY(fcntl(socket,F_GETFL,0))) < 0) {
+        syslog (LOG_ERR, "read socket status flags failed: %m");
+        return -1;
+    }
+
+    if(TEMP_FAILURE_RETRY(fcntl(socket, F_SETFL, O_NONBLOCK|flags)) < 0){
+        syslog (LOG_ERR, "set socket status flags failed: %m");
+        return -1;
+    }
+
+    /*Portable programs should not rely on inheritance or noninheritance of file status flags and */
+    /*always explicitly set all required flags*/
+    optval = 1;
+    if(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
+        syslog (LOG_ERR, "setsockopt SO_REUSEADDR failed: %m");
+        return -1;
+    }
+
+    /*Enable keepalive for this socket*/
+    optval = 1;
+    if(setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) {
+        syslog (LOG_ERR, "setsockopt SO_KEEPALIVE failed: %m");
+        return -1;
+    }
+
+    /*TODO: keepalive is not enough to find out if the connection is broken                     */
+    /*      apparently it just works while the handshake phase. See:                            */
+    /*      http://stackoverflow.com/questions/4345415/socket-detect-connection-is-lost         */
+    /*      I have to implement an echo/ping messages protocol (application layer)              */
+    /*      In the meanwhile this application should just be used in the localhost interface :( */
+
+    return 0;
+}
 
-int fork_system(int socket, char *command) {
-       int pid;
-       int out[2], err[2];
-       char buf[2000];
-       char string[100];
-       struct pollfd polls[2];
-       int returnstatus;
-       int n;
-       int childreturnstatus;
 
-       int returnValue = -1;   /*NOK by default*/
+int timeout (int fd, long timeout)
+{
+    struct timeval ptime;
+    fd_set fd_read;
+
+    ptime.tv_sec = timeout;
+    ptime.tv_usec = 0;
+    for (;;) {
+        
+    }    
+}
+
+int receive_from_socket (int socket, char *store, int *nLeft, struct timeval *ptime)
+{
+    int nData, iPos;   /*Control variables.*/
+       int ret;                        /*Store return value from select.*/
+       fd_set fd_read;         /*Values for select function.*/
+
+       nData = iPos = 0;
 
+       do {
+           FD_ZERO(&fd_read);
+               FD_SET(socket, &fd_read);
+               ret = select(socket+1, &fd_read, NULL, NULL, ptime);
+               if(ret == 0) {
+                       syslog(LOG_INFO, "receiving timeout error");
+                       return -1;
+               }
+               if(ret == -1) {
+                       syslog(LOG_ERR, "receiving select error: %m");
+                       return -1;
+               }
+               if((nData = recv(socket, &store[iPos], *nLeft, 0)) <= 0) {
+                       syslog (LOG_ERR, "read TCP socket failed: %m");
+                       return -1;
+               }
+               *nLeft -= nData;
+               iPos += nData;
+    } while (*nLeft > 0);
+
+    return 0;
+}
+
+
+int pre_fork_system(int socket, char *command)
+{
        /*Required variables in order to share memory between processes*/
        key_t keyvalue;
-       int idreturnst = -1;
+       int idreturnstatus = -1;
        /*Store the return status from the process launched using system or execve*/
        /*Using shared memory between the child and parent process*/
-       int *returnst = NULL;
+       int *returnstatus = NULL;
+       
        /*Required variables in order to share the semaphore between processes*/
        key_t keysemaphore;
        int idsemaphore = -1;
-       /*Used like a barrier: the child process just can start after sending the XML init code*/
-       sem_t *semaphore = NULL;
+       sem_t *semaphore = NULL;        /*Used as a barrier: the child process just can start after sending the XML init code*/
+
+       int returnValue = -1;       /*Return value from this function can be caught by upper layers, NOK by default*/
+               
        
        
-       out[0] = out[1] = err[0] = err[1]  = -1;
+       /*Allocate shared memory because we can not use named semaphores*/
+       /*We are using this semaphore as a barrier, because we just want to start the child process when the parent process has sent*/
+       /*the XML header (see: for_system function)*/
+       
+    keysemaphore=ftok("/bin/ls", SHAREMEMSEM);  /*the /bin/ls must exist otherwise this does not work... */
+    if (keysemaphore == -1) {
+        syslog (LOG_ERR, "ftok failed: %m");
+        goto End;
+    }
 
+    /*Attach shared memory*/
+    if ((idsemaphore = shmget(keysemaphore,sizeof(sem_t), 0660 | IPC_CREAT)) < 0) {
+        syslog (LOG_ERR, "semaphore initialization failed: %m");
+        goto EndandReleaseSem;
+    }
 
-       /*Creating the pipes, they will be attached to the stderr and stdout streams*/  
-       if (pipe(out) < 0 || pipe(err) < 0) {
-               syslog (LOG_ERR, "pipe failed: %m");
-               goto EndandClosePipes;
-  }
+    if ((semaphore = (sem_t *)shmat(idsemaphore, (void *)0, 0)) < 0) {
+        goto EndandReleaseSem;
+    }
 
+    if (sem_init(semaphore, 1, 1) < 0) {
+        syslog (LOG_ERR, "semaphore initialization failed: %m");
+        goto EndandDestroySem;
+    }
 
+    if (sem_wait(semaphore) < 0) {
+        syslog (LOG_ERR, "semaphore wait failed: %m");
+        goto EndandDestroySem;
+    }
        
-       /*Allocate shared memory because we can not use named semaphores*/
-       keysemaphore=ftok("/bin/ls", SHAREMEMSEM);  /*the /bin/ls must exist otherwise this does not work... */
-       if (keysemaphore == -1) {
-               syslog (LOG_ERR, "ftok failed: %m");
-               goto EndandClosePipes;
-  }
-       /*Attach shared memory*/
-       if ((idsemaphore = shmget(keysemaphore,sizeof(sem_t), 0660 | IPC_CREAT)) < 0) {
-               syslog (LOG_ERR, "semaphore initialization failed: %m");
-               goto EndandReleaseSem;
-       }
-       if ((semaphore = (sem_t *)shmat(idsemaphore, (void *)0, 0)) < 0) {
-               goto EndandReleaseSem;
-       }
+       
+       
+       /*Allocate shared memory for the return status code from the process which is going to be launched by the system function.*/
+       /*We want to share the returnstatus variable between this process and the child that is going to be created in the fork_system method.*/
+       /*The goal is to store in this variable the return status code received from the process launched with the system method by the child process,*/
+       /*then the parent process can retrieve that return status code and send it by TCP to the Java client.*/
+       /*There are not concurrency issues because the parent process will just try to read this variable when the child process is dead, taking*/
+       /*in that moment its last value and sending it to the Java client.*/
 
-       if (sem_init(semaphore, 1, 1) < 0) {
-               syslog (LOG_ERR, "semaphore initialization failed: %m");
-               goto EndandDestroySem;
-       }
+
+    keyvalue=ftok("/bin/ls", SHAREMEMKEY);  /*the /bin/ls must exist otherwise this does not work... */
+    if (keyvalue == -1) {
+        syslog (LOG_ERR, "ftok failed: %m");
+        goto EndandDestroySem;
+    }
+
+    /*Attach shared memory*/
+    if ((idreturnstatus=shmget(keyvalue,sizeof(int), 0660 | IPC_CREAT)) < 0) {
+        syslog (LOG_ERR, "shmget failed: %m");
+        goto EndandReleaseMem;
+    }
+
+    returnstatus = (int *)shmat(idreturnstatus, (void *)0, 0);
+    if ((*returnstatus)== -1) {
+        syslog (LOG_ERR, "shmat failed: %m");
+        goto EndandReleaseMem;
+    } 
+
+
+       /*After allocating and attaching shared memory we reach this code if everything went OK.*/      
        
-       if (sem_wait(semaphore) < 0) {
-               syslog (LOG_ERR, "semaphore wait failed: %m");
-               goto EndandDestroySem;
-       }
+       returnValue = fork_system(socket, command, semaphore, returnstatus);
 
 
+EndandReleaseMem:
+    if (returnstatus != NULL) {
+        /*detach memory*/
+        if (shmdt ((int *)returnstatus) < 0)
+            syslog (LOG_ERR, "returnstatus shared variable shmdt failed: %m");
+    }
+
+    /*Mark the segment to be destroyed.*/
+    if (shmctl (idreturnstatus, IPC_RMID, (struct shmid_ds *)NULL) < 0 )
+        syslog (LOG_ERR, "returnstatus shared variable shmctl failed: %m");
+EndandDestroySem:
+    if (sem_destroy(semaphore) <0)
+         syslog (LOG_ERR, "semaphore destroy failed: %m");
+EndandReleaseSem:
+    /*after sem_destroy-> input/output parameter NULL?*/
+    if (semaphore != NULL) {
+        /*detach memory*/
+        if (shmdt ((sem_t *)semaphore) < 0)
+            syslog (LOG_ERR, "semaphore shmdt failed: %m");
+    }
+
+    /*Mark the segment to be destroyed.*/
+    if (shmctl (idsemaphore, IPC_RMID, (struct shmid_ds *)NULL) < 0 )
+        syslog (LOG_ERR, "semaphore shmctl failed: %m");
+End:
+    return returnValue;
+}
+
+
+
+int fork_system(int socket, char *command, sem_t *semaphore, int *returnstatus) {
+       int pid;
+       int out[2], err[2];         /*Store pipes file descriptors. Write ends attached to the stdout and stderr streams.*/
+       char buf[2000];                 /*Read data buffer.*/
+       char string[100];
+       /*We are going to use a poll in order to find out if there are data coming from the*/
+       /*pipes attached to the stdout and stderr streams.*/
+       struct pollfd polls[2];
+       int n;
+       int childreturnstatus;
+       int returnValue = -1;   /*eturn value from this function can be caught by upper layers, NOK by default*/
 
-       /*Allocate shared memory for the return status code */
-       keyvalue=ftok("/bin/ls", SHAREMEMKEY);  /*the /bin/ls must exist otherwise this does not work... */
-       if (keyvalue == -1) {
-               syslog (LOG_ERR, "ftok failed: %m");
-               goto EndandDestroySem;
-       }
-       /*Attach shared memory*/
-       if ((idreturnst=shmget(keyvalue,sizeof(int), 0660 | IPC_CREAT)) < 0) {
-               syslog (LOG_ERR, "shmget failed: %m");
-               goto EndandReleaseMem;
-       }
-       returnst = (int *)shmat(idreturnst, (void *)0, 0);
-       if ((*returnst)== -1) {
-               syslog (LOG_ERR, "shmat failed: %m");
-               goto EndandReleaseMem;
-       }
-       /*By default*/
-       (*returnst) = 0;
 
+       /*Value by default*/
+    (*returnstatus) = 0;
+
+       
+       out[0] = out[1] = err[0] = err[1]  = -1;
 
+
+       /*Creating the pipes, they will be attached to the stderr and stdout streams*/  
+       if (pipe(out) < 0 || pipe(err) < 0) {
+           syslog (LOG_ERR, "pipe failed: %m");
+               goto EndandClosePipes;
+    }
+       
        
        if ((pid=fork()) == -1) {
                syslog (LOG_ERR, "fork failed: %m");
-               goto EndandReleaseMem;
+               goto EndandClosePipes;
        }
        
        if (pid == 0) {
                /*Child process*/
+               /*It has to launch another one using system or execve*/
                if ((dup2(out[1],1) < 0) || (dup2(err[1],2) < 0)) {     
-                       syslog (LOG_ERR, "child dup2 failed: %m");      
+                       syslog (LOG_ERR, "child dup2 failed: %m");
+            /*Going to zombie state, hopefully waitpid will catch it*/ 
                        exit(-1);
                }
                if (sem_wait(semaphore) < 0) {
                        syslog (LOG_ERR, "child semaphore wait failed: %m");
+            /*Going to zombie state, hopefully waitpid will catch it*/
                        exit(-1);
                }
                
-               returnstatus=system(command);
+               *returnstatus=system(command);
                
                if (WIFEXITED(returnstatus) == 1)
-                       (*returnst) = WEXITSTATUS(returnstatus);
+                       (*returnstatus) = WEXITSTATUS(*returnstatus);
                else
-                       (*returnst) = -1;
+                       (*returnstatus) = -1;
                exit(0);
        }
        else {
                /*Parent process*/
+               /*It sends data to the Java client using a TCP connection.*/
                polls[0].fd=out[0];
                polls[1].fd=err[0];
                polls[0].events=POLLIN;
                polls[1].events=POLLIN;
                sprintf(string,"<?xml version=\"1.0\"?>");
                send(socket,string,strlen(string),0);
-               sprintf(string,"<salida>");
+               sprintf(string,"<streams>");
                send(socket,string,strlen(string),0);
                
                /*Releasing barrier, the child process can keep running*/
                if (sem_post(semaphore) < 0 ) {
                        syslog (LOG_ERR, "parent error releasing barrier: %m");
-                       /*Beaware, sig_handler with the child process :]*/
+            /*TODO: May I kill a child process if it is already dead? I mean,*/
+            /*      what could happen if the child process is dead?*/
+            /*      Should I implement a SIGCHILD handler?*/
+            /*TODO: Should I have a SIGTERM handler in the child process?*/
                        kill(pid, SIGTERM);
-                       goto EndandReleaseMem;
+                       goto EndandClosePipes;
                }
+
                while(1) {
                        if(poll(polls,2,100)) {
                                if(polls[0].revents&&POLLIN) {
@@ -523,10 +620,15 @@ int fork_system(int socket, char *command) {
                                }
                                if(!polls[0].revents&&POLLIN && !polls[1].revents&&POLLIN) {
                                        syslog (LOG_ERR, "parent error polling pipes: %m");
-                                       /*Beaware, sig_handler with the child process :]*/
+                                       /*TODO: May I kill a child process if it is already dead? I mean,*/
+                    /*      what could happen if the child process is dead?*/
+                    /*      Should I implement a SIGCHILD handler?*/
+                    /*TODO: Should I have a SIGTERM handler in the child process?*/
                                        kill(pid, SIGTERM);
                                        /*I want to send an error status to the remote calling process*/
-                                       (*returnst) = -1;
+                    /*TODO: Before giving this value I should make sure the child process is dead*/
+                    /*      otherwise I could finish having in *returnstatus the value from the child process*/
+                                       (*returnstatus) = -1;
                                        break;
                                }
                        }
@@ -537,68 +639,46 @@ int fork_system(int socket, char *command) {
                                        /*First of all, we check the exit status of our child process*/
                                        /*In case of error send an error status to the remote calling process*/
                                        if (WIFEXITED(childreturnstatus) != 1)
-                                               (*returnst) = -1;
+                                               (*returnstatus) = -1;
                                        break;
                                }
                                /*The child process is not dead, keep polling more data from stdout or stderr streams*/
                        }
                }
+
        }
        /*Reaching this code when child finished or if error while polling pipes*/
-       sprintf(string,"<ret><![CDATA[%d]]></ret>", (*returnst));
+       sprintf(string,"<ret><![CDATA[%d]]></ret>", (*returnstatus));
        send(socket,string,strlen(string),0);
-       sprintf(string,"</salida>");
+       sprintf(string,"</streams>");
        send(socket,string,strlen(string),0);
 
        /*Stuff just done by the Parent process. The child process ends with exit*/
        
        returnValue = 0; /*if everything went OK*/
 
-EndandReleaseMem:
-               if (returnst != NULL) {
-                       /*detach memory*/
-                       if (shmdt ((int *)returnst) < 0)
-                               syslog (LOG_ERR, "returnstatus shared variable shmdt failed: %m");
-               }
-               /*Mark the segment to be destroyed.*/
-               if (shmctl (idreturnst, IPC_RMID, (struct shmid_ds *)NULL) < 0 )
-                       syslog (LOG_ERR, "returnstatus shared variable shmctl failed: %m");
-EndandDestroySem:
-               if (sem_destroy(semaphore) <0)
-                       syslog (LOG_ERR, "semaphore destroy failed: %m");
-EndandReleaseSem:
-               /*after sem_destroy-> input/output parameter NULL?*/
-               if (semaphore != NULL) {
-                       /*detach memory*/
-                       if (shmdt ((sem_t *)semaphore) < 0)
-                               syslog (LOG_ERR, "semaphore shmdt failed: %m");
-               }
-               /*Mark the segment to be destroyed.*/
-               if (shmctl (idsemaphore, IPC_RMID, (struct shmid_ds *)NULL) < 0 )
-                       syslog (LOG_ERR, "semaphore shmctl failed: %m");
+
 EndandClosePipes:
-               closeSafely (out[0]);
-               closeSafely (out[1]);
-               closeSafely (err[0]);
-               closeSafely (err[1]);
+    closeSafely (out[0]);
+    closeSafely (out[1]);
+    closeSafely (err[0]);
+    closeSafely (err[1]);
 
        return returnValue;
 }
 
 
-void sigterm_handler(int sig)
+void sigint_handler(int sig)
 {
-  if (daemonPID != getpid()) {
-    //Do nothing
-    return;
-  }
-  if (signal (SIGTERM, SIG_IGN) == SIG_ERR)
-    {
-      syslog (LOG_ERR, "signal desactivation failed");
+    if (daemonPID != getpid()) {
+        //Do nothing
+        return;
     }
 
+    if (signal (SIGINT, SIG_IGN) == SIG_ERR)
+        syslog (LOG_ERR, "signal SIGINT desactivation failed: %m");
 
-  close (sockfd);
-  /*TODO: kill child processes and release allocate memory*/
-  exit (0);
+    closeSafely (sockfd);
+    /*TODO: kill child processes and release allocate memory*/
+    exit (0);
 }
old mode 100644 (file)
new mode 100755 (executable)
index aacdc18..d8fbf14
@@ -9,6 +9,7 @@
 
 
 
+
 /****************************************************************************************
 * This method is used by pthread_create                                                 *
 *                                                                                                                                                                          *
@@ -47,7 +48,17 @@ int main_child (char *address, int port, int queue);
 * INPUT PARAMETER: socket file descriptor                                               *
 * RETURNS: void                                                                          *
 ****************************************************************************************/
-int fork_system(int socket, char *command);
+int fork_system(int socket, char *command, sem_t *semaphore, int *returnst);
+
+
+/****************************************************************************************
+* This method is used by pthread_create                                                 *
+*                                                                                       *
+* INPUT PARAMETER: socket file descriptor                                               *
+* RETURNS: void                                                                          *
+****************************************************************************************/
+int pre_fork_system(int socket, char *command);
+
 
 
 
@@ -58,4 +69,9 @@ int fork_system(int socket, char *command);
 * INPUT PARAMETER: socket file descriptor                                               *
 * RETURNS: void                                                                          *
 ****************************************************************************************/
-void sigterm_handler();
+void sigint_handler();
+
+
+
+int required_sock_options (int socket);
+int receive_from_socket (int socket, char *store, int *Left, struct timeval *ptime);
index 9b4180b..ac0a312 100644 (file)
@@ -22,12 +22,12 @@ public class RemoteForkMain  {
                
                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                final PrintStream stdout = new PrintStream(baos);
-               final String command = "ls -lah";
+               final String command = "ls -lah ~/.ssh; ls -lah * bbbb aaa";
                final ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
                final PrintStream stderr = new PrintStream(baos2);
                int result;
                
-               result = LauncherProcessesDiaFork.exec(command,stdout, stderr, "127.0.0.1", 5193);
+               result = LauncherProcesses.exec(command,stdout, stderr, "gumartinm.name", 5193);
                System.out.println(result);
                System.out.println("Stdout: " +  baos.toString());
                System.out.println("Stderr: " +  baos2.toString());
index 0ecb257..89141c5 100644 (file)
@@ -7,6 +7,7 @@ import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
+import org.xml.sax.InputSource;
 import org.xml.sax.SAXException;
 
 /**
@@ -94,12 +95,10 @@ public class TCPForkDaemon {
        /******************************************************************************************/
        /*          Just over 1 TCP connection                                                    */
        /*          COMMAND_LENGTH: Java integer 4 bytes, BIG-ENDIAN (the same as network order)  */
-       /*          ACK: integer 4 bytes big-endian (for Java) with the sent comand length                */
-       /*          COMMAND: TPV locale character set encoding                                    */
-       /*          RESULTS: TPV locale character set encoding                                    */
+       /*          COMMAND: remote locale character set encoding                                 */
+       /*          RESULTS: remote locale character set encoding                                 */
        /*                                                                                        */
        /*              JAVA CLIENT: ------------ COMMAND_LENGTH -------> :SERVER                 */
-       /*              JAVA CLIENT: <---------------- ACK -------------- :SERVER                 */
        /*              JAVA CLIENT: -------------- COMMAND ------------> :SERVER                 */
        /*              JAVA CLIENT: <-------------- RESULTS ------------ :SERVER                 */
        /*              JAVA CLIENT: <---------- CLOSE CONNECTION ------- :SERVER                 */
@@ -110,35 +109,48 @@ public class TCPForkDaemon {
                
                socket = new Socket(InetAddress.getByName(host), port);
                try {
-                       /*Must be used the remote charset :S*/
+                       //By default in UNIX systems the keepalive message is sent after 20hours 
+                       //with Java we can not use the TCP_KEEPCNT, TCP_KEEPIDLE and TCP_KEEPINTVL options by session.
+                       //It is up to the server administrator and client user to configure them.
+                       //I guess it is because Solaris does not implement those options...
+                       //see: Net.c openjdk 6 and net_util_md.c openjdk 7
+                       //So in Java applications the only way to find out if the connection is broken (one router down)
+                       //is sending ping messages or something similar from the application layer. Java is a toy language...
+                       //Anyway I think the keepalive messages just work during the handshake phase, just after sending some
+                       //data over the link the keepalive does not work.
+                       // See: http://stackoverflow.com/questions/4345415/socket-detect-connection-is-lost
+                       socket.setKeepAlive(true);
+                       
+                       //It must be used the remote locale character set encoding
                        byte [] commandEncoded = command.getBytes("UTF-8"); 
                        
                        DataOutputStream sendData = new DataOutputStream(socket.getOutputStream());
-                       DataInputStream receiveData = new DataInputStream(socket.getInputStream());
                                                
                        // 1. COMMAND_LENGTH
                        sendData.writeInt(commandEncoded.length);
-
-                       // 2. ACK
-                       // TODO: if the server close the connection we could stay here probably
-                       // until TCP keepalive is sent (20 hours by default in Linux)
-                       int ack = receiveData.readInt();
-                       if (ack != commandEncoded.length) 
-                               throw new IOException("invalid ACK, something went wrong " +
-                                       "with the TCPForkDaemon. Check the /var/log/messages file in the TPV");
-                       
-                       
-                       // 3. COMMAND
+                               
+                       // 2. COMMAND
                        sendData.write(commandEncoded);
                        
-                       
-                       // 4. RESULTS
-                       // TODO: if the server closes the connection we could stay here probably
-                       // until TCP keepalive is sent (20 hours by default in Linux)
+                       // 3. RESULTS
+                       // TODO: When the network infrastructure (between client and server) fails in this point 
+                       // (broken router for example) Could we stay here until TCP keepalive is sent?
+                       // (20 hours by default in Linux)
+                       // Impossible to use a timeout, because we do not know how much time is going to long the command :/
+                       // the only way to fix this issue in Java is sending ping messages (Could we fix it using custom settings in the OS
+                       // of the client and server machines? for example in Linux see /proc/sys/net/ipv4/)
+                       InputSource inputSource = new InputSource(socket.getInputStream());
+                       //Must be used the remote locale character set encoding?
+                       inputSource.setEncoding("UTF-8");
                        parser.setStream(socket.getInputStream());
                        
+                       DataInputStream receiveData = new DataInputStream(socket.getInputStream());
+
+                       
+                       // 2. ACK
+                       int ack = receiveData.readInt();
                        
-                       // 5. SERVER CLOSES CONNECTION
+                       // 4. SERVER CLOSED CONNECTION
                }
                finally {
                        if (out != null) {
index 51fe7f0..255cb0f 100644 (file)
@@ -15,7 +15,7 @@ import org.xml.sax.ext.DefaultHandler2;
  * <p>
  * Class intended to parse the XML stream received from the daemon which is
  * waiting to run commands. These commands are sent by the method 
- * {@link es.dia.pos.n2a.util.os.unix.TCPForkDaemon#exec(String, String, int)} 
+ * {@link de.fork.java.TCPForkDaemon#exec(String, String, int)} 
  * </p>
  * <p>
  * After processing one command the daemon sends a XML stream with the stderr,