Many improvements in my remote TCP fork for Java
authorgumartinm <gu.martinm@gmail.com>
Thu, 26 Jan 2012 23:49:59 +0000 (00:49 +0100)
committergumartinm <gu.martinm@gmail.com>
Thu, 26 Jan 2012 23:49:59 +0000 (00:49 +0100)
I keep working on it
Next steps cleaning up the code and maybe using execve instead of system.

JavaFork/Daemon/Makefile [new file with mode: 0644]
JavaFork/Daemon/javafork.c
JavaFork/Daemon/javafork.h
JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/LauncherProcessesDiaFork.java [new file with mode: 0644]
JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java [new file with mode: 0644]
JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java [new file with mode: 0644]
JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java [new file with mode: 0644]

diff --git a/JavaFork/Daemon/Makefile b/JavaFork/Daemon/Makefile
new file mode 100644 (file)
index 0000000..7c28c45
--- /dev/null
@@ -0,0 +1,8 @@
+all: javafork
+
+javafork: javafork.c javafork.h
+        gcc -Wall -g -o javafork javafork.c -lpthread
+
+clean:
+       rm -f  javafork
+
index 3ad2346..366a72b 100644 (file)
 #include <sys/poll.h>
 #include <string.h>
 #include <signal.h>
+#include <ctype.h>
+#include <errno.h>
+#include <endian.h>
 #include "javafork.h"
 
-pid_t currentPID;
+
+pid_t daemonPID;
 int sockfd;
 
+#define RESTARTABLE(_cmd, _result) do { \
+    _result = _cmd; \
+} while((_result == -1) && (errno == EINTR))
 
-int main (int argc, char *argv[]) {
-    pid_t pid;
+static int restartableClose(int fd) {
+    int err;
+    RESTARTABLE(close(fd), err);
+    return err;
+}
 
-       if (signal(SIGTERM, sigterm_handler) == SIG_ERR) 
-       {
-               perror ("\nerror signal handler");
-               return -1;
-       }
+static int closeSafely(int fd) {
+    return (fd == -1) ? 0 : restartableClose(fd);
+}
 
-       if ((pid = fork()) == -1) {
-               perror("fork: ");
-       return -1;
+
+int main (int argc, char *argv[]) {
+       int c; /*Getopt parameter*/
+       /*Default values*/
+       char *avalue = IPADDRESS; /*Address: numeric value or hostname*/
+       int pvalue = PORT;        /*TCP port*/
+       int qvalue = QUEUE;       /*TCP listen queue*/
+       
+       
+       /*This process is intended to be used as a daemon, it sould be launched by the INIT process, because of that*/
+       /*we are not forking it (INIT needs it)*/
+       if (daemonize(argv[0], LOG_SYSLOG, LOG_PID) < 0)
+               return 1;
+       
+       setsid();
+       
+       if (signal(SIGPIPE,SIG_IGN) == SIG_ERR) {
+               syslog (LOG_ERR, "Error with SIGPIPE: %m");
+               return 1;
        }
-       else {
-               if (pid == 0) {
-                       /*child process*/
-                       currentPID = getpid();
-                       main_child(argc, argv);
+       
+       
+       opterr = 0;
+       while ((c = getopt (argc, argv, "a:p:q:")) != -1) {
+               switch (c) {
+                       case 'a':
+                               avalue = optarg;
+                               break;
+                       case 'p':
+                               pvalue = atoi(optarg);
+                               if ((pvalue > 65535) || (pvalue <= 0)) {
+                                       syslog (LOG_ERR, "Port value %d out of range", pvalue);
+                                       return 1;
+                               }
+                               break;
+                       case 'q':
+                               qvalue = atoi(optarg);
+                               break;
+                       case '?':
+                               if ((optopt == 'a') || (optopt == 'p') || (optopt == 'q'))
+                                       syslog (LOG_ERR, "Option -%c requires an argument.", optopt);
+                               else if (isprint (optopt))
+                                       syslog (LOG_ERR, "Invalid option '-%c'.", optopt);
+                               else
+                                       syslog (LOG_ERR, "Unknown option character '\\x%x'.", optopt);
+                               return 1;
+                       default:
+                               abort ();
                }
        }
-
+       /*This program does not admit options*/
+       if (optind < argc) {
+               syslog (LOG_ERR,"This program does not admit options just argument elements with their values.");
+               return 1;
+       }
+       
+       
+       /*TODO: INIT process sending SIGTERM? Should I catch that signal?*/
+       daemonPID = getpid();
+       if (signal(SIGTERM, sigterm_handler) == SIG_ERR) {
+               syslog (LOG_ERR, "SIGTERM signal handler failed: %m");
+               return 1;
+       }
+       
+       
+       if (main_child(avalue, pvalue, qvalue) < 0 )
+               return 1;
+       
        return 0;
 }
 
 
-int main_child (int argc, char *argv[]) {
-       struct protoent *protocol;
-       struct sockaddr_in addr_server;
-       struct sockaddr_in  addr_client;
-    int sockclient;
-    pthread_t idThread;
-       int clilen;
-
+int main_child (char *address, int port, int queue) {
+       struct protoent *protocol;                      /*Network protocol*/
+       struct sockaddr_in addr_server; /*struct with the server socket address*/
+       struct sockaddr_in  addr_client;/*struct with the client socket address*/
+       int sockclient;                                                                 /*File descriptor for the accepted socket*/
+       pthread_t idThread;                                                     /*Thread identifier number*/
+       socklen_t clilen;
+       int optval;
+       int returnValue = 0;                                            /*OK by default*/
+       
        
-       daemonize(argv[0], LOG_SYSLOG, LOG_PID);
-    setsid();
-
-       signal(SIGPIPE,SIG_IGN);
-
        /*Retrieve protocol number from /etc/protocols file */
-    protocol=getprotobyname("tcp");
+       protocol=getprotobyname("tcp");
        if (protocol == NULL) {
-               syslog(LOG_ERR, "cannot map \"tcp\" to protocol number");
-       exit (1);
+               syslog(LOG_ERR, "cannot map \"tcp\" to protocol number: %m");
+               goto EndwithError;
        }
-
+       
        bzero((char*) &addr_server, sizeof(addr_server));
        addr_server.sin_family = AF_INET;
-    if (inet_pton(AF_INET, argv[1], &addr_server.sin_addr.s_addr) <= 0)
-        syslog (LOG_ERR, "error inet_pton");
+       if (inet_pton(AF_INET, address, &addr_server.sin_addr.s_addr) <= 0) {
+               syslog (LOG_ERR, "error inet_pton: %m");
+               goto EndwithError;
+       }
+       
+       addr_server.sin_port = htons(port);
+       
+       if ((sockfd = socket(AF_INET, SOCK_STREAM, protocol->p_proto)) < 0) {
+               syslog (LOG_ERR, "socket creation failed: %m");
+               goto EndandClosewithError; 
+       }
 
-       addr_server.sin_port = htons(atol(argv[2]));
 
-       if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-        syslog (LOG_ERR, "socket creation failed");
-               exit(1);
+       /*We want to avoid issues while trying to bind a socket in TIME_WAIT state.*/
+       /*In this application from the client there should not be any trouble if it gets*/
+       /*TIME_WAIT states, because it can connect from any source port.*/
+       optval = 1;
+       if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
+               syslog (LOG_ERR, "setsockopt failed: %m");
+               goto EndandClosewithError;
        }
-
+       
        if (bind(sockfd, (struct sockaddr *) &addr_server, sizeof(addr_server)) < 0) {
-               syslog (LOG_ERR, "socket bind failed");
-        exit(1);
+               syslog (LOG_ERR, "socket bind failed: %m");
+               goto EndandClosewithError;
        }
-
-       if (listen (sockfd, atoi(argv[3])) < 0 ) {
-               syslog (LOG_ERR, "socket listen failed");
-        exit(1);
+       
+       if (listen (sockfd, queue) < 0 ) {
+               syslog (LOG_ERR, "socket listen failed: %m");
+               goto EndandClosewithError;
        }       
-
+       
        while(1) {
                clilen =  sizeof(addr_client);
                if ((sockclient = accept (sockfd, (struct sockaddr *) &addr_client, &clilen)) < 0) {
-                       syslog (LOG_ERR, "socket accept failed");
-               exit(1);
+                       syslog (LOG_ERR, "socket accept failed: %m");
+                       goto EndandClosewithError;
                }
                if (pthread_create (&idThread, NULL, serverThread, (void *)sockclient) != 0 ) {
-                       syslog (LOG_ERR, "thread creation failed");
+                       syslog (LOG_ERR, "thread creation failed: %m");
                }       
        }
+       
+       
+       EndasUsual:
+               return returnValue;
+       EndandClosewithError:
+               close (sockfd);
+       EndwithError:
+               /*When there is an error.*/
+               returnValue = -1;       
+               goto EndasUsual;
 }
 
 
 int daemonize(const char *pname, int facility, int option) {
        int fd;
+       
+       if ( (fd = open( "/dev/tty", O_RDWR, 0) ) == -1) {
+               /*We already have no tty control*/
+               close(fd);
+               return 0;
+       }
+       
+       /*Sending messages to log*/
+       openlog(pname, option, facility);
+       /*To get a controlling tty*/
+       if (ioctl(fd, TIOCNOTTY, (caddr_t)0) <0 ) {
+               syslog (LOG_ERR, "Getting tty failed: %m");
+               return -1;
+       }
+       if (close(fd) < 0) {
+               syslog (LOG_ERR, "Closing tty failed: %m");
+               return -1;
+       }
+       
+       if((fd = open( "/dev/null", O_RDWR, 0) ) == -1) {
+               close(fd);
+               return -1;
+       }
+       dup2(fd,0);
+       dup2(fd,1);
+       dup2(fd,2);
+       close(fd);
+       
+       return 0;
+}
 
-       if ( (fd = open( "/dev/tty", O_RDWR, 0) ) == -1) {
-       /*We already have no tty control*/
-       close(fd);
-       return 0;
-       }
+void *serverThread (void * arg) {
+       int sock;
 
-    /*Sending messages to log*/
-    openlog(pname, option, facility);
+       /*Variables used for the select function*/
+       struct timeval ptime;
+       fd_set fd_read;
+       int ret;
 
-       ioctl(fd, TIOCNOTTY, (caddr_t)0);
-       close(fd);
+       /*Control parameters used while receiving data from the client*/
+       int nLeft, nData, iPos;
+       /*This buffer is intended to store the data received from the client.*/
+       char buffer[1025];
 
-       if((fd = open( "/dev/null", O_RDWR, 0) ) == -1) {
-       close(fd);
-       return 0;
-       }
-       dup2(fd,0);
-       dup2(fd,1);
-       dup2(fd,2);
-       close(fd);
+       /*The command sent by the client, to be executed by this process*/      
+       char *command;
+       /*Store the command length*/
+       uint32_t *commandLength;
+       /*Store the ack*/
+       uint32_t *ack;
 
-       return 0;
-}
 
-void *serverThread (void * arg) {
-       int sockclient;
-    int n;
-    char buf[512];
 
-       sockclient = (int) arg;
+       sock = (int) arg;
+
        pthread_detach(pthread_self());
 
-       n = recv(sockclient, buf, sizeof(buf), 0);
-       if(n > 0){
-       buf[n] = '\0';
-       fork_system(sockclient, buf);
-       }
 
-       close(sockclient);
-       /*FIXME: dunno why, but after finishing connection from client we have TIME_WAIT in this socket*/
+
+       /******************************************************************************************/
+       /*                                      Just over 1 TCP connection                                                                                                                                                                                                              */
+       /*                                      COMMAND_LENGTH: Java integer 4 bytes, BIG-ENDIAN (the same as network order)    */
+       /*                                      ACK: integer 4 bytes big-endian (for Java) with the received comand length              */
+       /*                                      COMMAND: TPV locale character set encoding                                                                                                                                              */
+       /*                                      RESULTS: TPV locale character set encoding                                                                                                                                              */
+       /*                                                                                                                                                                                                                                                                                                                                                              */
+       /*                                                      JAVA CLIENT: ------------ COMMAND_LENGTH -------> :SERVER                                                                       */
+       /*                                                      JAVA CLIENT: <---------------- ACK -------------- :SERVER                                                                       */ 
+       /*                                                      JAVA CLIENT: -------------- COMMAND ------------> :SERVER                                                                       */
+       /*                                                      JAVA CLIENT: <-------------- RESULTS ------------ :SERVER                                                                       */
+       /*                                                      JAVA CLIENT: <---------- CLOSE CONNECTION ------- :SERVER                                                                       */
+       /*                                                                                                                                                                                                                                                                                                                                                              */
+       /******************************************************************************************/
+
+
+       /*Wait 2 seconds for data coming from client.*/
+       ptime.tv_sec=2;
+  ptime.tv_usec=0;
+
+
+       /*COMMAND LENGTH*/
+       /*First of all we receive the command size as a Java integer (4 bytes primitive type)*/ 
+       if ((commandLength = (uint32_t *) malloc(sizeof(uint32_t))) == NULL) {
+               syslog (LOG_ERR, "commandLength malloc failed: %m");
+               goto Error;
+       }
+       bzero(buffer, sizeof(buffer));
+       nLeft = sizeof(uint32_t);
+       nData = iPos = 0;
+       do {
+               FD_ZERO(&fd_read);
+               FD_SET(sock, &fd_read);
+               ret = select(sock+1, &fd_read,NULL,NULL, &ptime);
+               if(ret == 0) {
+                       syslog(LOG_INFO, "receiving command length timeout error");
+                       goto Error;
+               }
+               if(ret == -1) {
+        syslog(LOG_ERR, "receiving command length select error: %m");
+                               goto Error;
+    }
+               if((nData = recv(sock, &buffer[iPos], nLeft, 0)) <= 0) {
+                       syslog (LOG_ERR, "command length reception failed: %m");
+                       goto Error;
+               }
+               nLeft -= nData;
+               iPos += nData;
+       } while (nLeft > 0);
+       if (nLeft < 0) {
+               syslog (LOG_INFO, "command length excessive data received, expected: 4, received: %d", (-nLeft + sizeof(uint32_t)) );
+       }
+
+       /*Retrieve integer (4 bytes) from buffer*/
+       memcpy (commandLength, buffer, sizeof(uint32_t));
+       /*Java sends the primitive integer using big-endian order (it is the same as network order)*/
+       *commandLength = be32toh (*commandLength);
+
+
        
+       /*ACK*/
+       /*Sending back the command length as the ACK message*/
+       if ((ack = (uint32_t *) malloc(sizeof(uint32_t))) == NULL) {
+               syslog (LOG_ERR, "commandLength malloc failed: %m");
+               goto ErrorAck;
+  }
+       *ack = htobe32(*commandLength);
+       if (send(sock, ack, sizeof(uint32_t), 0) < 0) {
+               syslog (LOG_ERR, "send ACK failed: %m");
+               goto ErrorAck;
+       }
+
+
+               
+       /*COMMAND*/
+       /*Reserving commandLength + 1 because of the string end character*/
+       if ((command = (char *) malloc(*commandLength + 1)) == NULL) {
+               syslog (LOG_ERR, "command malloc failed: %m");
+               goto ErrorCommand;
+       } 
+       bzero(command, ((*commandLength) + 1));
+  nLeft = *commandLength;
+  nData = iPos = 0;
+  do {
+    FD_ZERO(&fd_read);
+    FD_SET(sock, &fd_read);
+    ret = select(sock+1, &fd_read,NULL,NULL, &ptime);
+    if(ret == 0) {
+      syslog(LOG_INFO, "receiving command timeout error");
+      goto ErrorCommand;
+    }
+    if(ret == -1) {
+        syslog(LOG_ERR, "receiving command select error: %m");
+        goto ErrorCommand;
+    }
+    if((nData = recv(sock, &command[iPos], nLeft, 0)) <= 0) {
+       syslog (LOG_ERR, "command reception failed: %m");
+       goto ErrorCommand;
+    }
+    nLeft -= nData;
+    iPos += nData;
+  } while (nLeft > 0);
+  if (nLeft < 0) {
+    syslog (LOG_INFO, "execessive command length, expected: %d, received: %d", *commandLength, (-nLeft + *commandLength));
+  }
+
+
+       /*RESULTS*/     
+       fork_system(sock, command);     
+
+
+       /*CLOSE CONNECTION AND FINISH*/
+
+
+ErrorCommand:
+       free(command);
+ErrorAck:
+       free(ack);
+Error: 
+  close(sock);
+       free(commandLength);
+
        pthread_exit(0);
 }
 
 
 int fork_system(int socket, char *command) {
        int pid;
-    int pfdOUT[2];
-    int pfdERR[2];
+       int out[2], err[2];
        char buf[2000];
-    char string[100];
+       char string[100];
        struct pollfd polls[2];
        int returnstatus;
        int n;
+       int childreturnstatus;
+
+       int returnValue = -1;   /*NOK by default*/
 
-       sem_t * semaphore;
        /*Required variables in order to share memory between processes*/
        key_t keyvalue;
-       int idreturnst;
-       int *returnst;
-
-       if (pipe(pfdOUT) == -1) {
-               syslog (LOG_ERR, "pipe for stdout failed");
-               close(pfdOUT[0]);
-        close(pfdOUT[1]);
-       return -1;
-       }
-       if (pipe(pfdERR) == -1) {
-               syslog (LOG_ERR, "pipe for stderr failed");
-       close(pfdOUT[0]);
-       close(pfdOUT[1]);
-               close(pfdERR[0]);
-        close(pfdERR[1]);
-       return -1;
-       }
-
-       if ((semaphore = sem_open("javaforksem", O_CREAT, 0644, 1)) == SEM_FAILED) {
-               syslog (LOG_ERR, "semaphore open failed");
-        close(pfdOUT[0]);
-        close(pfdOUT[1]);
-        close(pfdERR[0]);
-        close(pfdERR[1]);
-               if (sem_close(semaphore) <0 ) {
-                       syslog (LOG_ERR, "semaphore open failed and close failed");
-               }
-               if (sem_unlink("javaforksem") < 0) {
-                       syslog (LOG_ERR, "semaphore open failed and unlink failed");
-               }
-               return -1;
+       int idreturnst = -1;
+       /*Store the return status from the process launched using system or execve*/
+       /*Using shared memory between the child and parent process*/
+       int *returnst = NULL;
+       /*Required variables in order to share the semaphore between processes*/
+       key_t keysemaphore;
+       int idsemaphore = -1;
+       /*Used like a barrier: the child process just can start after sending the XML init code*/
+       sem_t *semaphore = NULL;
+       
+       
+       out[0] = out[1] = err[0] = err[1]  = -1;
+
+
+       /*Creating the pipes, they will be attached to the stderr and stdout streams*/  
+       if (pipe(out) < 0 || pipe(err) < 0) {
+               syslog (LOG_ERR, "pipe failed: %m");
+               goto EndandClosePipes;
+  }
+
+
+       
+       /*Allocate shared memory because we can not use named semaphores*/
+       keysemaphore=ftok("/bin/ls", SHAREMEMSEM);  /*the /bin/ls must exist otherwise this does not work... */
+       if (keysemaphore == -1) {
+               syslog (LOG_ERR, "ftok failed: %m");
+               goto EndandClosePipes;
+  }
+       /*Attach shared memory*/
+       if ((idsemaphore = shmget(keysemaphore,sizeof(sem_t), 0660 | IPC_CREAT)) < 0) {
+               syslog (LOG_ERR, "semaphore initialization failed: %m");
+               goto EndandReleaseSem;
+       }
+       if ((semaphore = (sem_t *)shmat(idsemaphore, (void *)0, 0)) < 0) {
+               goto EndandReleaseSem;
        }
 
        if (sem_init(semaphore, 1, 1) < 0) {
-               syslog (LOG_ERR, "semaphore initialization failed");
-               close(pfdOUT[0]);
-        close(pfdOUT[1]);
-        close(pfdERR[0]);
-        close(pfdERR[1]);
-        if (sem_close(semaphore) <0 ) {
-            syslog (LOG_ERR, "semaphore open failed and close failed");
-        }
-        if (sem_unlink("javaforksem") < 0) {
-            syslog (LOG_ERR, "semaphore open failed and unlink failed");
-        }
-       return -1;
-       }
+               syslog (LOG_ERR, "semaphore initialization failed: %m");
+               goto EndandDestroySem;
+       }
        
        if (sem_wait(semaphore) < 0) {
-               syslog (LOG_ERR, "semaphore wait failed");
+               syslog (LOG_ERR, "semaphore wait failed: %m");
+               goto EndandDestroySem;
        }
 
-       /*Allocate shared memory*/
-    keyvalue=ftok("/bin/ls", SHAREMEMKEY);  /*the /bin/ls must exist otherwise this does not work... */
+
+
+       /*Allocate shared memory for the return status code */
+       keyvalue=ftok("/bin/ls", SHAREMEMKEY);  /*the /bin/ls must exist otherwise this does not work... */
        if (keyvalue == -1) {
-               syslog (LOG_ERR, "ftok failed");
-               /*TODO: Close again pipes and semaphore.*/
-               return -1;
+               syslog (LOG_ERR, "ftok failed: %m");
+               goto EndandDestroySem;
        }
        /*Attach shared memory*/
-       idreturnst=shmget(keyvalue,sizeof(int), 0660 | IPC_CREAT);
-    if (idreturnst == -1) {
-               syslog (LOG_ERR, "shmget failed");
-               /*TODO: Close again pipes and semaphore.*/
-               return -1;
+       if ((idreturnst=shmget(keyvalue,sizeof(int), 0660 | IPC_CREAT)) < 0) {
+               syslog (LOG_ERR, "shmget failed: %m");
+               goto EndandReleaseMem;
        }
        returnst = (int *)shmat(idreturnst, (void *)0, 0);
        if ((*returnst)== -1) {
-               syslog (LOG_ERR, "shmat failed");
-               /*TODO: Close again pipes and semaphore.*/
-               return -1;
+               syslog (LOG_ERR, "shmat failed: %m");
+               goto EndandReleaseMem;
        }
-
        /*By default*/
        (*returnst) = 0;
 
+
+       
        if ((pid=fork()) == -1) {
-       perror("fork: ");
-               close(pfdOUT[0]);
-        close(pfdOUT[1]);
-        close(pfdERR[0]);
-        close(pfdERR[1]);
-               if (sem_close(semaphore) <0 ) {
-            syslog (LOG_ERR, "semaphore open failed and close failed");
-        }
-        if (sem_unlink("javaforksem") < 0) {
-            syslog (LOG_ERR, "semaphore open failed and unlink failed");
-        }
-       return -1;
-       }
-       else {
+               syslog (LOG_ERR, "fork failed: %m");
+               goto EndandReleaseMem;
+       }
+       
+       if (pid == 0) {
                /*Child process*/
-               if (pid == 0) {
-                       if (dup2(pfdOUT[1],1) < 0) {    
-                               syslog (LOG_ERR, "child: dup2 pfdOUT failed");  
-                               exit(-1);
-                       }
-                       if (dup2(pfdERR[1],2) < 0) {    
-                syslog (LOG_ERR, "child: dup2 pfdERR failed");
-                               exit(-1);
-            }
-                       if (sem_wait(semaphore) < 0) {
-                       syslog (LOG_ERR, "child: semaphore wait failed");
-                   }
-                       returnstatus=system(command);
-                       if (WIFEXITED(returnstatus) == 1) {
-                               (*returnst) = WEXITSTATUS(returnstatus);
-                       }
-                       else {
-                               (*returnst) = -1;
-                       }
-                       exit(0);
+               if ((dup2(out[1],1) < 0) || (dup2(err[1],2) < 0)) {     
+                       syslog (LOG_ERR, "child dup2 failed: %m");      
+                       exit(-1);
                }
-               else {
-                       /*Parent process*/
-                       polls[0].fd=pfdOUT[0];
-               polls[1].fd=pfdERR[0];
-               polls[0].events=POLLIN;
-               polls[1].events=POLLIN;
-               sprintf(string,"<?xml version=\"1.0\"?>");
-               send(socket,string,strlen(string),0);
-               sprintf(string,"<salida>");
-               send(socket,string,strlen(string),0);
-                       /*The child will be woken up*/
-                       if (sem_post(semaphore) < 0 ) {
-                               syslog (LOG_ERR, "error waiking up child process");
-                               /*TODO: exit closing the descriptors*/
-                       }
-                       while(1){
-                               if(poll(polls,2,100)){
-                               if(polls[0].revents&&POLLIN) {
-                               bzero(buf,2000);
-                                               n=read(pfdOUT[0],buf,1990);
-                                               sprintf(string,"<out><![CDATA[");
-                               send(socket,string,strlen(string),0);
-                                       send(socket,buf,n,0);
-                               sprintf(string,"]]></out>");
-                                       send(socket,string,strlen(string),0);
-                               } 
-                                       if(polls[1].revents&&POLLIN) {
-                                               bzero(buf,2000);
-                               n=read(pfdERR[0],buf,1990);
-                                               sprintf(string,"<error><![CDATA[");
-                                   send(socket,string,strlen(string),0);
-                               send(socket,buf,n,0);
-                               sprintf(string,"]]></error>");
-                               send(socket,string,strlen(string),0);
-                               }
-                                       if(!polls[0].revents&&POLLIN && !polls[1].revents&&POLLIN) {
-                                               syslog (LOG_ERR, "Error polling pipes");
-                                       break;
-                               }
+               if (sem_wait(semaphore) < 0) {
+                       syslog (LOG_ERR, "child semaphore wait failed: %m");
+                       exit(-1);
+               }
+               
+               returnstatus=system(command);
+               
+               if (WIFEXITED(returnstatus) == 1)
+                       (*returnst) = WEXITSTATUS(returnstatus);
+               else
+                       (*returnst) = -1;
+               exit(0);
+       }
+       else {
+               /*Parent process*/
+               polls[0].fd=out[0];
+               polls[1].fd=err[0];
+               polls[0].events=POLLIN;
+               polls[1].events=POLLIN;
+               sprintf(string,"<?xml version=\"1.0\"?>");
+               send(socket,string,strlen(string),0);
+               sprintf(string,"<salida>");
+               send(socket,string,strlen(string),0);
+               
+               /*Releasing barrier, the child process can keep running*/
+               if (sem_post(semaphore) < 0 ) {
+                       syslog (LOG_ERR, "parent error releasing barrier: %m");
+                       /*Beaware, sig_handler with the child process :]*/
+                       kill(pid, SIGTERM);
+                       goto EndandReleaseMem;
+               }
+               while(1) {
+                       if(poll(polls,2,100)) {
+                               if(polls[0].revents&&POLLIN) {
+                                       bzero(buf,2000);
+                                       n=read(out[0],buf,1990);
+                                       sprintf(string,"<out><![CDATA[");
+                                       send(socket,string,strlen(string),0);
+                                       send(socket,buf,n,0);
+                                       sprintf(string,"]]></out>");
+                                       send(socket,string,strlen(string),0);
+                               } 
+                               if(polls[1].revents&&POLLIN) {
+                                       bzero(buf,2000);
+                                       n=read(err[0],buf,1990);
+                                       sprintf(string,"<error><![CDATA[");
+                                       send(socket,string,strlen(string),0);
+                                       send(socket,buf,n,0);
+                                       sprintf(string,"]]></error>");
+                                       send(socket,string,strlen(string),0);
                                }
-                               else {
-                                       /*When timeout*/
-                                       if(waitpid(pid,NULL,WNOHANG)) {
-                                               /*Child is dead, we can finish the connection*/
-                                               break;
-                                       }
-                                       /*The child process is not dead, keep polling more data from stdout or stderr streams*/
+                               if(!polls[0].revents&&POLLIN && !polls[1].revents&&POLLIN) {
+                                       syslog (LOG_ERR, "parent error polling pipes: %m");
+                                       /*Beaware, sig_handler with the child process :]*/
+                                       kill(pid, SIGTERM);
+                                       /*I want to send an error status to the remote calling process*/
+                                       (*returnst) = -1;
+                                       break;
                                }
                        }
-                       
-                       /*We reach this code when the child process is dead or because of an error polling pipes*/
-                       /*In the second case the result stored in *returnst could be wrong, anyway there was*/
-                       /*an error so, the result is unpredictable.*/
-                       /*TODO: if error while polling pipes do not reach this code an exit with -1*/
-                       sprintf(string,"<ret><![CDATA[%d]]></ret>", (*returnst));
-               send(socket,string,strlen(string),0);
-               sprintf(string,"</salida>");
-               send(socket,string,strlen(string),0);
-
-                       close(pfdOUT[0]);
-                       close(pfdOUT[1]);
-                       close(pfdERR[0]);
-                       close(pfdERR[1]);
-                       if (sem_close(semaphore) <0 ) {
-               syslog (LOG_ERR, "semaphore close failed");
-               }
-               if (sem_unlink("javaforksem") < 0) {
-               syslog (LOG_ERR, "semaphore unlink failed");
-               }
-       
-                       shmdt ((int *)returnst);
-               shmctl (idreturnst, IPC_RMID, (struct shmid_ds *)NULL);
+                       else {
+                               /*When timeout*/
+                               if(waitpid(pid, &childreturnstatus, WNOHANG)) {
+                                       /*Child is dead, we can finish the connection*/
+                                       /*First of all, we check the exit status of our child process*/
+                                       /*In case of error send an error status to the remote calling process*/
+                                       if (WIFEXITED(childreturnstatus) != 1)
+                                               (*returnst) = -1;
+                                       break;
+                               }
+                               /*The child process is not dead, keep polling more data from stdout or stderr streams*/
+                       }
                }
        }
+       /*Reaching this code when child finished or if error while polling pipes*/
+       sprintf(string,"<ret><![CDATA[%d]]></ret>", (*returnst));
+       send(socket,string,strlen(string),0);
+       sprintf(string,"</salida>");
+       send(socket,string,strlen(string),0);
 
-       return 0;
+       /*Stuff just done by the Parent process. The child process ends with exit*/
+       
+       returnValue = 0; /*if everything went OK*/
+
+EndandReleaseMem:
+               if (returnst != NULL) {
+                       /*detach memory*/
+                       if (shmdt ((int *)returnst) < 0)
+                               syslog (LOG_ERR, "returnstatus shared variable shmdt failed: %m");
+               }
+               /*Mark the segment to be destroyed.*/
+               if (shmctl (idreturnst, IPC_RMID, (struct shmid_ds *)NULL) < 0 )
+                       syslog (LOG_ERR, "returnstatus shared variable shmctl failed: %m");
+EndandDestroySem:
+               if (sem_destroy(semaphore) <0)
+                       syslog (LOG_ERR, "semaphore destroy failed: %m");
+EndandReleaseSem:
+               /*after sem_destroy-> input/output parameter NULL?*/
+               if (semaphore != NULL) {
+                       /*detach memory*/
+                       if (shmdt ((sem_t *)semaphore) < 0)
+                               syslog (LOG_ERR, "semaphore shmdt failed: %m");
+               }
+               /*Mark the segment to be destroyed.*/
+               if (shmctl (idsemaphore, IPC_RMID, (struct shmid_ds *)NULL) < 0 )
+                       syslog (LOG_ERR, "semaphore shmctl failed: %m");
+EndandClosePipes:
+               closeSafely (out[0]);
+               closeSafely (out[1]);
+               closeSafely (err[0]);
+               closeSafely (err[1]);
+
+       return returnValue;
 }
 
 
 void sigterm_handler(int sig)
 {
-       if (currentPID != getpid()) {
-               //Do nothing
-               return;
-       }
-       if (signal (SIGTERM, SIG_IGN) == SIG_ERR)
-       {
-               syslog (LOG_ERR, "signal desactivation failed");
-       }
-
-       close (sockfd);
-       /*TODO: kill child processes and release allocate memory*/
-       exit (0);
+  if (daemonPID != getpid()) {
+    //Do nothing
+    return;
+  }
+  if (signal (SIGTERM, SIG_IGN) == SIG_ERR)
+    {
+      syslog (LOG_ERR, "signal desactivation failed");
+    }
+
+
+  close (sockfd);
+  /*TODO: kill child processes and release allocate memory*/
+  exit (0);
 }
index 7f03332..aacdc18 100644 (file)
@@ -1,8 +1,61 @@
-/*System V IPC key*/
+/*System V IPC keys*/
 #define SHAREMEMKEY 1
+#define SHAREMEMSEM 2
 
+/*Non-argument default values*/
+#define PORT 5193
+#define IPADDRESS "127.0.0.1"
+#define QUEUE 6
+
+
+
+/****************************************************************************************
+* This method is used by pthread_create                                                 *
+*                                                                                                                                                                          *
+* INPUT PARAMETER: socket file descriptor                                                                                              *
+* RETURNS: void                                                                                                                                                        *
+****************************************************************************************/
 void *serverThread (void *arg);
+
+
+
+/****************************************************************************************
+* This method is used by pthread_create                                                 *
+*                                                                                       *
+* INPUT PARAMETER: socket file descriptor                                               *
+* INPUT PARAMETER: 
+* INPUT PARAMETER:
+* RETURNS: void                                                                          *
+****************************************************************************************/
 int daemonize(const char *pname, int facility, int option);
-int main_child (int argc, char *argv[]);
+
+
+
+/****************************************************************************************
+* This method is used by pthread_create                                                 *
+*                                                                                       *
+* INPUT PARAMETER: socket file descriptor                                               *
+* RETURNS: int                                                                         *
+****************************************************************************************/
+int main_child (char *address, int port, int queue);
+
+
+
+/****************************************************************************************
+* This method is used by pthread_create                                                 *
+*                                                                                       *
+* INPUT PARAMETER: socket file descriptor                                               *
+* RETURNS: void                                                                          *
+****************************************************************************************/
 int fork_system(int socket, char *command);
+
+
+
+
+/****************************************************************************************
+* This method is used by pthread_create                                                 *
+*                                                                                       *
+* INPUT PARAMETER: socket file descriptor                                               *
+* RETURNS: void                                                                          *
+****************************************************************************************/
 void sigterm_handler();
diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/LauncherProcessesDiaFork.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/LauncherProcessesDiaFork.java
new file mode 100644 (file)
index 0000000..2b32c3e
--- /dev/null
@@ -0,0 +1,283 @@
+package de.fork.java;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.UnknownHostException;
+import javax.xml.parsers.ParserConfigurationException;
+import org.apache.log4j.Logger;
+import org.xml.sax.SAXException;
+
+
+public class LauncherProcessesDiaFork {
+       // Exit process status
+       private static final int STATUS_ERR = -1;
+       private static final int DEFAULT_PORT = 5193;
+       private static final String DEFAULT_HOST = "127.0.0.1";
+       
+       /**
+        * Run a process.
+        * 
+        * @param command system command to be executed.
+        * 
+        * @return return code.
+        */
+       public static int exec(final String command) throws IOException, InterruptedException {
+
+               return exec(command, null, null);
+       }
+
+       /**
+        * Run a process.
+        * 
+        * @param command system command to execute.
+        * @param standarOutPut if not null, the standard output is redirected to this parameter.
+        *            
+        * @return return code.            
+        */
+       public static int exec(final String command, final PrintStream standarOutPut) throws IOException, InterruptedException {
+
+               return exec(command, standarOutPut, null);
+       }
+
+       
+       /**
+        * Run a process.
+        * 
+        * @param command system command to be executed.
+        * @param standarOutPut if not null, the standard output is redirected to this parameter.
+        * @param errorOutPut if not null, the error output is redirected to this parameter.
+        * 
+        * @return return code from the executed system command.     
+        */
+       public static int exec(final String command, final PrintStream standarOutPut, final PrintStream errorOutPut) throws IOException, InterruptedException {
+
+               return exec(command, standarOutPut, errorOutPut, DEFAULT_HOST, DEFAULT_PORT);
+       }
+
+       /**
+        * Run a process.
+        * 
+        * @param command system command to be executed.
+        * @param aLogger send the information to log.
+        */
+       public static int exec(final String command, final Logger aLogger) throws IOException, InterruptedException {
+
+               //calling private method to handle logger input/ouput in a common method
+               return execHandlingLogger(command, aLogger, DEFAULT_HOST, DEFAULT_PORT);
+       }
+       
+       
+       /**
+        * Run process.
+        * 
+        * @param commandAndArguments String array containing system command and its 
+        * arguments to be executed.<br>
+        * <b>For example:</b> 
+        * <pre>
+        * commandAndArguments[0]="ls";
+        * commandAndArguments[1]="-lr";
+        * </pre>
+        * @param aLogger
+        * 
+        * @return return code from the executed system command.
+        * 
+        * @throws IOException
+        * @throws InterruptedException
+        */
+       public static int exec(final String[] commandAndArguments, final Logger aLogger) throws IOException, InterruptedException {
+               String wholeCommand="";
+               
+               for(String argument : commandAndArguments) {
+                       wholeCommand = wholeCommand + " " + argument;
+               }
+               
+               //calling private method to handle logger input/ouput in a common method
+               return execHandlingLogger(wholeCommand, aLogger, DEFAULT_HOST, DEFAULT_PORT);
+       }
+       
+       
+       /**
+        * Run process using a remote process runner.
+        * 
+        * @param command system command to be executed.
+        * @param standarOutPut the stdout stream from that command as a <code>PrintStream</code>
+        * @param errorOutPut the stderr stream from that command as a <code>PrintStream</code>
+        * @param host the specified host.
+        * @param port the where the remote process runner accepts connections.
+        * 
+        * <p> The host name can either be a machine name, such as
+     * "<code>java.sun.com</code>", or a textual representation of its
+     * IP address. If a literal IP address is supplied, only the
+     * validity of the address format is checked.
+     * </p>
+     * <p> For <code>host</code> specified in literal IPv6 address,
+     * either the form defined in RFC 2732 or the literal IPv6 address
+     * format defined in RFC 2373 is accepted. IPv6 scoped addresses are also
+     * supported. See <a href="Inet6Address.html#scoped">here</a> for a description of IPv6
+     * scoped addresses.
+        * </p>
+        * 
+        * @return the executed command's return code.
+        * 
+        * @throws UnknownHostException
+        * @throws IOException
+        */
+       public static int exec(final String command, final PrintStream standarOutPut, 
+                       final PrintStream errorOutPut, final String host, final int port) 
+                                                                                       throws IOException, InterruptedException {
+               int exitStatus = LauncherProcessesDiaFork.STATUS_ERR;
+               XmlForkParser forkParser = null;
+               TCPForkDaemon process = null;
+               
+               try {
+                       forkParser = new XmlForkParser();
+                       process = new TCPForkDaemon(forkParser, host, port);
+                       exitStatus = process.exec(command);
+               } catch (ParserConfigurationException e) {
+                       // This is not a crazy thing, we are trying to insert this new method without
+                       // breaking the old methods which did not throw SAXException or ParserConfigurationException
+                       // Do not blame me.
+                       throw new IOException(e);
+               } catch (SAXException e) {
+                       // This is not a crazy thing, we are trying to insert this new method without
+                       // breaking the old methods which did not throw SAXException or ParserConfigurationException
+                       // Do not blame me.
+                       throw new IOException(e);
+               }       
+               
+
+               
+               if ((standarOutPut != null) && (process.getStdout() != null)){
+                       standarOutPut.println(process.getStdout());
+               }
+
+               if ((errorOutPut != null) && (process.getStderr() != null)){
+                       errorOutPut.println(process.getStderr());
+               }
+
+               return exitStatus;
+       }
+       
+       
+       /**
+        * Run process.
+        * 
+        * @param command system command to be executed.
+        * @param aLogger
+        * @param host the specified host.
+        * @param port the TCP port where the daemon accepts connections.
+        * 
+        * @return the executed command's return code.
+        * 
+        * @throws IOException
+        * @throws InterruptedException
+        */
+       private static int execHandlingLogger(final String command, final Logger aLogger, 
+                               final String host, int port) throws IOException, InterruptedException {
+               int exitStatus = LauncherProcessesDiaFork.STATUS_ERR;
+               XmlForkParser forkParser = null;
+               TCPForkDaemon process = null;
+               
+               try {
+                       forkParser = new XmlForkParser();
+                       process = new TCPForkDaemon(forkParser, host, port);
+                       exitStatus = process.exec(command);
+               } catch (ParserConfigurationException e) {
+                       throw new IOException(e);
+               } catch (SAXException e) {
+                       throw new IOException(e);
+               }
+               
+
+               
+               if (process.getStdout() != null) {
+                       aLogger.info(process.getStdout());
+               }
+               if (process.getStderr() != null) {
+                       aLogger.error(process.getStderr());
+               }
+
+               return exitStatus;
+       }
+       
+       
+       /**
+        * Run process
+        * 
+        * @param command command and its arguments to be executed.<br>
+        * <b>For example:</b> 
+        * <pre>
+        * commandAndArguments[0]="ls";
+        * commandAndArguments[1]="-lr";
+        * </pre>
+        * @param aLogger send information to log
+        * 
+        * @return the executed command's return code.
+        * 
+        * @throws IOException
+        * @throws InterruptedException
+        */
+       public static InputStream execStream (final String [] command, final Logger aLogger) 
+                                                                                                       throws IOException, InterruptedException {
+               int exitStatus = LauncherProcessesDiaFork.STATUS_ERR;
+               InputStream stdInput = null;
+               XmlForkParser forkParser = null;
+               TCPForkDaemon process = null;
+               String wholeCommand="";
+               
+               for(String argument : command) {
+                       wholeCommand = wholeCommand + " " + argument;
+               }               
+               
+               try {
+                       forkParser = new XmlForkParser();
+                       process = new TCPForkDaemon(forkParser, DEFAULT_HOST, DEFAULT_PORT);
+                       exitStatus = process.exec(wholeCommand);
+               } catch (ParserConfigurationException e) {
+                       // This is not a crazy thing, we are trying to insert this new method without
+                       // breaking the old methods which did not throw SAXException or ParserConfigurationException
+                       // Do not blame me.
+                       throw new IOException(e);
+               } catch (SAXException e) {
+                       // This is not a crazy thing, we are trying to insert this new method without
+                       // breaking the old methods which did not throw SAXException or ParserConfigurationException
+                       // Do not blame me.
+                       throw new IOException(e);
+               }
+
+               
+               if(exitStatus == 0) {
+                       stdInput = new ByteArrayInputStream(process.getStdout().getBytes("UTF-8"));
+               }
+               else {
+                       aLogger.error(process.getStderr());
+               }
+               
+
+               return stdInput;
+       }
+       
+       /**
+        * <p>The <em>command</em> is lunched from <em>location</em>
+        * <li>#>cd <em>location</em></li>
+        * <li>#location> <em>command</em></li></p>
+        * 
+        * @param command the command to be executed by the daemon.
+        * @param location
+        * 
+        * @return the executed command's return code. <br>
+        * Usually <code>0</code> if execution is OK, otherwise <code>!=0</code> 
+        * 
+        * @throws IOException
+        * @throws InterruptedException
+        */
+       public static int execInLocation (final String command, final String location) throws IOException, InterruptedException {
+               int exitStatus = LauncherProcessesDiaFork.STATUS_ERR;
+               final String wholeCommand = "cd " + location + " && " + command;
+               
+               exitStatus =  exec(wholeCommand, null, null, DEFAULT_HOST, DEFAULT_PORT);
+               return exitStatus;
+       }
+}
diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java
new file mode 100644 (file)
index 0000000..9b4180b
--- /dev/null
@@ -0,0 +1,36 @@
+package de.fork.java;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintStream;
+import javax.xml.parsers.ParserConfigurationException;
+import org.xml.sax.SAXException;
+
+
+public class RemoteForkMain  {
+
+       /**
+        * @param args
+        * @throws InterruptedException 
+        * @throws IOException 
+        * @throws SAXException 
+        * @throws ParserConfigurationException 
+        * @throws FileNotFoundException 
+        */
+       public static void main(String[] args) throws IOException, InterruptedException {
+               
+               final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               final PrintStream stdout = new PrintStream(baos);
+               final String command = "ls -lah";
+               final ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
+               final PrintStream stderr = new PrintStream(baos2);
+               int result;
+               
+               result = LauncherProcessesDiaFork.exec(command,stdout, stderr, "127.0.0.1", 5193);
+               System.out.println(result);
+               System.out.println("Stdout: " +  baos.toString());
+               System.out.println("Stderr: " +  baos2.toString());
+       }
+
+}
\ No newline at end of file
diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java
new file mode 100644 (file)
index 0000000..0ecb257
--- /dev/null
@@ -0,0 +1,178 @@
+package de.fork.java;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import org.xml.sax.SAXException;
+
+/**
+ * <p>
+ * With this class we can run processes using the intended daemon which is 
+ * waiting for TCP connections in a specified port.
+ * </p>
+ * <p>
+ * Receiving the results from the daemon where we can find three kinds of 
+ * different fields: stderror, stdout and the return value of the command which was
+ * run by the remote daemon. Each field is related to the stderr, stdout and 
+ * return code respectively.
+ * </p>
+ * <p>
+ * This class has to retrieve the results from the remote daemon and it offers two 
+ * methods wich can be used to retrieve the stderr and stdout in a right way 
+ * without having to know about the coding used by the daemon to send us the results. 
+ * The user does not have to know about how the daemon sends the data, he or she 
+ * will work directly with the strings related to each stream using these methods:
+ * {@link TCPForkDaemon#getStdout()} and {@link TCPForkDaemon#getStderr()}.
+ * The return code from the command executed by the daemon can be retrieved as the 
+ * return parameter from the method {@link TCPForkDaemon#exec(String, String, int)}
+ * </p>
+ * <p>
+ * Instances of this class are mutable. To use them concurrently, clients must surround each
+ * method invocation (or invocation sequence) with external synchronization of the clients choosing.
+ * </p>
+ */
+public class TCPForkDaemon {
+       private final XmlForkParser parser;
+       private final String host;
+       private final int port;
+       
+       
+       /**
+        * Default constructor for this {@link TCPForkDaemon} implementation.
+        * 
+     * <p> The host name can either be a machine name, such as
+     * "<code>java.sun.com</code>", or a textual representation of its
+     * IP address. If a literal IP address is supplied, only the
+     * validity of the address format is checked.
+     * </p>
+     * <p> For <code>host</code> specified in literal IPv6 address,
+     * either the form defined in RFC 2732 or the literal IPv6 address
+     * format defined in RFC 2373 is accepted. IPv6 scoped addresses are also
+     * supported. See <a href="Inet6Address.html#scoped">here</a> for a description of IPv6
+     * scoped addresses.
+        * </p>
+        * @param parser instance implemeting {@link XmlForkParser} which knows about what 
+        * codification uses the daemon to send us the results of the command sent to
+        * by the remote daemon by the {@link TCPForkDaemon.#exec(String)} method.
+        * @param host the specified host.
+        * @param port the TCP port where the daemon accepts connections.
+        * 
+        */
+       public TCPForkDaemon (final XmlForkParser parser, final String host, final int port) {
+               this.parser = parser;
+               this.host = host;
+               this.port = port;
+       }
+       
+       
+       /**
+        * <p>
+        * This method sends commands to a remote daemon using a TCP socket.
+        * We create a new TCP socket every time we send commands.
+        * </p>
+        * <p>
+        * It uses a TCP connection in order to send commands and receive
+        * the results related to that command from the remote daemon. The command's 
+        * result code which was run by the remote daemon can be retrieved from the
+        * return parameter of this method.
+        * </p>
+        * @param  command the command to be executed by the daemon.
+        * @return the executed command's return code.
+        * @throws IOException 
+        * @throws UnknownHostException
+        * @throws SAXException 
+        * @throws SecurityException if a security manager exists
+        */
+       public int exec(final String command) throws UnknownHostException, IOException, SAXException {
+               PrintWriter out = null;
+               Socket socket = null;
+               
+       /******************************************************************************************/
+       /*          Just over 1 TCP connection                                                    */
+       /*          COMMAND_LENGTH: Java integer 4 bytes, BIG-ENDIAN (the same as network order)  */
+       /*          ACK: integer 4 bytes big-endian (for Java) with the sent comand length                */
+       /*          COMMAND: TPV locale character set encoding                                    */
+       /*          RESULTS: TPV locale character set encoding                                    */
+       /*                                                                                        */
+       /*              JAVA CLIENT: ------------ COMMAND_LENGTH -------> :SERVER                 */
+       /*              JAVA CLIENT: <---------------- ACK -------------- :SERVER                 */
+       /*              JAVA CLIENT: -------------- COMMAND ------------> :SERVER                 */
+       /*              JAVA CLIENT: <-------------- RESULTS ------------ :SERVER                 */
+       /*              JAVA CLIENT: <---------- CLOSE CONNECTION ------- :SERVER                 */
+       /*                                                                                        */
+       /******************************************************************************************/
+
+
+               
+               socket = new Socket(InetAddress.getByName(host), port);
+               try {
+                       /*Must be used the remote charset :S*/
+                       byte [] commandEncoded = command.getBytes("UTF-8"); 
+                       
+                       DataOutputStream sendData = new DataOutputStream(socket.getOutputStream());
+                       DataInputStream receiveData = new DataInputStream(socket.getInputStream());
+                                               
+                       // 1. COMMAND_LENGTH
+                       sendData.writeInt(commandEncoded.length);
+
+                       // 2. ACK
+                       // TODO: if the server close the connection we could stay here probably
+                       // until TCP keepalive is sent (20 hours by default in Linux)
+                       int ack = receiveData.readInt();
+                       if (ack != commandEncoded.length) 
+                               throw new IOException("invalid ACK, something went wrong " +
+                                       "with the TCPForkDaemon. Check the /var/log/messages file in the TPV");
+                       
+                       
+                       // 3. COMMAND
+                       sendData.write(commandEncoded);
+                       
+                       
+                       // 4. RESULTS
+                       // TODO: if the server closes the connection we could stay here probably
+                       // until TCP keepalive is sent (20 hours by default in Linux)
+                       parser.setStream(socket.getInputStream());
+                       
+                       
+                       // 5. SERVER CLOSES CONNECTION
+               }
+               finally {
+                       if (out != null) {
+                               out.close();
+                       }
+                       socket.close();
+               }
+               
+               //If everything went alright we should be able to retrieve the return 
+               //status of the remotely executed command.
+               return parser.getReturnValue();
+       }
+
+       
+       /**
+        * Retrieve the standard output. <br>
+        * When there is nothing from the standard output this method returns null.
+        * 
+        * @see {@link TCPForkDaemon#getStderr()}
+        * @return the stdout stream
+        */
+       public String getStdout() {
+               return parser.getStdout();
+       }
+       
+       
+       /**
+        * Retrieve the stderr stream as a {@link String} from the command which 
+        * was run by the remote daemon 
+        * 
+        * @see {@link TCPForkDaemon#getStdout()}
+        * @return the stderr stream
+        */
+       public String getStderr() {
+               return parser.getStderr();
+       }
+}
diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java
new file mode 100644 (file)
index 0000000..51fe7f0
--- /dev/null
@@ -0,0 +1,195 @@
+package de.fork.java;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import org.apache.log4j.Logger;
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.SAXParseException;
+import org.xml.sax.ext.DefaultHandler2;
+
+/**
+ * <p>
+ * Class intended to parse the XML stream received from the daemon which is
+ * waiting to run commands. These commands are sent by the method 
+ * {@link es.dia.pos.n2a.util.os.unix.TCPForkDaemon#exec(String, String, int)} 
+ * </p>
+ * <p>
+ * After processing one command the daemon sends a XML stream with the stderr,
+ * stdout and return status of that command. With this class we extract those values
+ * and we can retrieve them using the methods {@link #getStderr() }, {@link #getStdout()}
+ * and {@link #getReturnValue()}
+ * </p>
+ * <p>
+ * <pre>
+ * <b>Example, stream received from daemon:</b>
+ * {@code
+ * <?xml version="1.0"?><salida><error><![CDATA[ls: no se puede acceder a bbb: No existe el fichero o el directorio
+ * ls: no se puede acceder a aaa: No existe el fichero o el directorio
+ * ls: no se puede acceder a dddd: No existe el fichero o el directorio
+ * ]]></error><ret><![CDATA[2]]></ret></salida>
+ * }
+ * </pre>
+ * </p>
+ * <p>
+ * Instances of this class are mutable. To use them concurrently, clients must surround each
+ * method invocation (or invocation sequence) with external synchronization of the clients choosing.
+ * </p>
+ */
+public class XmlForkParser extends DefaultHandler2 {
+       private static final Logger logger = Logger.getLogger(XmlForkParser.class);
+    private StringBuffer accumulator = new StringBuffer();
+       private String stderr = new String();
+       private String stdout = new String();
+       private String returnCode = new String();
+       final SAXParserFactory spf = SAXParserFactory.newInstance();
+       private final SAXParser saxParser;
+       
+       public XmlForkParser() throws ParserConfigurationException, SAXException {
+               saxParser = spf.newSAXParser();
+       }
+       
+       public void setStream(InputStream stream) throws SAXException, IOException {
+               saxParser.parse(stream, this);
+       }
+
+       /**
+        * <p>
+        * The daemon sends a XML stream, we parse that stream and the results are
+        * stored in the instace fields {@link #stderr}, {@link #stdout} and {@link returnCode}
+        * </p>
+        * <p>
+        * Later we can retrieve the results with {@link #getStderr()}, {@link #getStdout()} and
+        * {@link #getReturnValue()}
+        * </p>
+        */
+       @Override
+       public void endElement (final String uri, final String localName, final String qName) {
+               if (qName.equals("error")) {
+                       // After </error>, we've got the stderror
+                       stderr = stderr + accumulator.toString();
+               } else if (qName.equals("out")) {
+                       // After </out>, we've got the stdout
+                       stdout = stdout + accumulator.toString();
+               } else if (qName.equals("ret")) {
+                       returnCode = returnCode + accumulator.toString();
+               }
+       }
+       
+       /**
+        * <p>
+        * This method removes the <code>\n<code> characters at the end of the stdout 
+        * or stderr stream.
+        * </p>
+        * 
+     * @throws SAXException If any SAX errors occur during processing.
+        */
+       @Override
+       public void endDocument () throws SAXException
+       {
+               if (stderr.length() != 0) {
+                       String lastStderr = stderr.replaceFirst("\\\n$", "");
+                       stderr = lastStderr;
+               }
+               else {
+                       //if there is nothing from the stderr stream
+                       stderr = null;
+               }
+               if (stdout.length() != 0) {
+                       String lastStdout = stdout.replaceFirst("\\\n$", "");
+                       stdout = lastStdout;
+               }
+               else {
+                       //if there is nothing from the stdout stream
+                       stdout = null;
+               }
+       }
+       
+       /**
+        * Retrieve the standard error.
+        * When there is nothing from the standard error this method returns null.
+        * 
+        * <pre>
+        * <b>Example, stream received from daemon:</b>
+        * {@code
+        * <?xml version="1.0"?><salida><error><![CDATA[ls: no se puede acceder a bbb: No existe el fichero o el directorio
+        * ls: no se puede acceder a aaa: No existe el fichero o el directorio
+        * ls: no se puede acceder a dddd: No existe el fichero o el directorio
+        * ]]></error><ret><![CDATA[2]]></ret></salida>
+        * }
+        * </pre>
+        * </p>
+        * <p>
+        * <pre>
+        * <b>From that example with this method we are going to obtain this return parameter:</b>
+        * {@code
+        * ls: no se puede acceder a bbb: No existe el fichero o el directorio
+        * ls: no se puede acceder a aaa: No existe el fichero o el directorio
+        * ls: no se puede acceder a dddd: No existe el fichero o el directorio
+        * }
+        * </pre>
+        * 
+        * @return stderr
+        */
+       public String getStderr() {
+               return stderr;
+               
+       }
+       
+       
+       /**
+        * Retrieve the standard output.
+        * When there is nothing from the standard output this method returns null.
+        * 
+        * @see {@link XmlForkParser#getStderr()}
+        * @return stdout
+        */
+       public String getStdout() {
+               return stdout;
+       }
+       
+       
+       /**
+        * Retrieve the return code from the executed command.
+        * 
+        * @return return status, usually <code>0<code> means the command went OK.
+        */
+       public int getReturnValue() {
+               return new Integer(returnCode).intValue();
+       }
+       
+       
+       @Override
+       public void startElement (final String uri, final String localName, 
+                                                                       final String qName, final Attributes attributes) {
+               accumulator.setLength(0);
+       }
+       
+       
+       @Override
+       public void characters(final char[] buffer, final int start, final int length) {
+           accumulator.append(buffer, start, length);
+       }
+       
+       
+       @Override
+       public void warning(final SAXParseException exception) {
+               logger.error("WARNING line:" + exception.getLineNumber(), exception);
+       }
+       
+       
+       @Override
+       public void error(final SAXParseException exception) {
+               logger.error("ERROR line:" + exception.getLineNumber(), exception);
+       }
+       
+       
+       @Override
+       public void fatalError(final SAXParseException exception) throws SAXException {
+               logger.error("FATAL ERROR line:" + exception.getLineNumber(), exception);
+               throw (exception);
+       }
+}