From 92973ab6160a12b029eb9cd66f8fa0a4b48723d9 Mon Sep 17 00:00:00 2001 From: gumartinm Date: Sun, 29 Jan 2012 02:26:47 +0100 Subject: [PATCH] I keep woking on Java TCP fork. It is being harder than I thought. --- JavaFork/Daemon/javafork.c | 690 ++++++++++++--------- JavaFork/Daemon/javafork.h | 20 +- .../src/main/java/de/fork/java/RemoteForkMain.java | 4 +- .../src/main/java/de/fork/java/TCPForkDaemon.java | 56 +- .../src/main/java/de/fork/java/XmlForkParser.java | 2 +- 5 files changed, 440 insertions(+), 332 deletions(-) mode change 100644 => 100755 JavaFork/Daemon/javafork.c mode change 100644 => 100755 JavaFork/Daemon/javafork.h diff --git a/JavaFork/Daemon/javafork.c b/JavaFork/Daemon/javafork.c old mode 100644 new mode 100755 index 366a72b..fd1d03a --- a/JavaFork/Daemon/javafork.c +++ b/JavaFork/Daemon/javafork.c @@ -24,283 +24,265 @@ #include "javafork.h" -pid_t daemonPID; -int sockfd; +pid_t daemonPID; /*Stores the daemon server PID*/ +int sockfd = -1; /*Stores the daemon server TCP socket.*/ -#define RESTARTABLE(_cmd, _result) do { \ - _result = _cmd; \ -} while((_result == -1) && (errno == EINTR)) -static int restartableClose(int fd) { - int err; - RESTARTABLE(close(fd), err); - return err; + +static int restartableClose(int fd) +{ + return TEMP_FAILURE_RETRY(close(fd)); } -static int closeSafely(int fd) { + + +static int closeSafely(int fd) +{ + /*If we always initialize file descriptor variables with value -1*/ + /*this method should work like a charm*/ return (fd == -1) ? 0 : restartableClose(fd); } -int main (int argc, char *argv[]) { - int c; /*Getopt parameter*/ + +int main (int argc, char *argv[]) +{ + int c; /*Getopt parameter*/ /*Default values*/ - char *avalue = IPADDRESS; /*Address: numeric value or hostname*/ - int pvalue = PORT; /*TCP port*/ - int qvalue = QUEUE; /*TCP listen queue*/ + char *avalue = IPADDRESS; /*Address: numeric value or hostname*/ + int pvalue = PORT; /*TCP port*/ + int qvalue = QUEUE; /*TCP listen queue*/ /*This process is intended to be used as a daemon, it sould be launched by the INIT process, because of that*/ /*we are not forking it (INIT needs it)*/ if (daemonize(argv[0], LOG_SYSLOG, LOG_PID) < 0) return 1; - + + /*Changing session.*/ setsid(); if (signal(SIGPIPE,SIG_IGN) == SIG_ERR) { - syslog (LOG_ERR, "Error with SIGPIPE: %m"); + syslog (LOG_ERR, "signal SIGPIPE desactivation failed: %m"); return 1; } - opterr = 0; while ((c = getopt (argc, argv, "a:p:q:")) != -1) { switch (c) { - case 'a': - avalue = optarg; - break; - case 'p': - pvalue = atoi(optarg); - if ((pvalue > 65535) || (pvalue <= 0)) { - syslog (LOG_ERR, "Port value %d out of range", pvalue); - return 1; - } - break; - case 'q': - qvalue = atoi(optarg); - break; - case '?': - if ((optopt == 'a') || (optopt == 'p') || (optopt == 'q')) - syslog (LOG_ERR, "Option -%c requires an argument.", optopt); - else if (isprint (optopt)) - syslog (LOG_ERR, "Invalid option '-%c'.", optopt); - else - syslog (LOG_ERR, "Unknown option character '\\x%x'.", optopt); + case 'a': + avalue = optarg; + break; + case 'p': + pvalue = atoi(optarg); + if ((pvalue > 65535) || (pvalue <= 0)) { + syslog (LOG_ERR, "Port value %d out of range", pvalue); return 1; - default: - abort (); + } + break; + case 'q': + qvalue = atoi(optarg); + break; + case '?': + if ((optopt == 'a') || (optopt == 'p') || (optopt == 'q')) + syslog (LOG_ERR, "Option -%c requires an argument.", optopt); + else if (isprint (optopt)) + syslog (LOG_ERR, "Invalid option '-%c'.", optopt); + else + syslog (LOG_ERR, "Unknown option character '\\x%x'.", optopt); + return 1; + default: + abort (); } } + /*This program does not admit options*/ if (optind < argc) { syslog (LOG_ERR,"This program does not admit options just argument elements with their values."); return 1; } - - /*TODO: INIT process sending SIGTERM? Should I catch that signal?*/ + /*INIT process sending SIGINT? Should I catch that signal?*/ daemonPID = getpid(); - if (signal(SIGTERM, sigterm_handler) == SIG_ERR) { + if (signal(SIGINT, sigint_handler) == SIG_ERR) { syslog (LOG_ERR, "SIGTERM signal handler failed: %m"); return 1; } - - if (main_child(avalue, pvalue, qvalue) < 0 ) + if (main_child(avalue, pvalue, qvalue) < 0) return 1; return 0; } -int main_child (char *address, int port, int queue) { + +int main_child (char *address, int port, int queue) +{ struct protoent *protocol; /*Network protocol*/ - struct sockaddr_in addr_server; /*struct with the server socket address*/ - struct sockaddr_in addr_client;/*struct with the client socket address*/ - int sockclient; /*File descriptor for the accepted socket*/ - pthread_t idThread; /*Thread identifier number*/ + struct sockaddr_in addr_server; /*struct with the server socket address*/ + struct sockaddr_in addr_client; /*struct with the client socket address*/ + int sockclient = -1; /*File descriptor for the accepted socket*/ + pthread_t idThread; /*Thread identifier number*/ socklen_t clilen; int optval; - int returnValue = 0; /*OK by default*/ + int returnValue = 0; /*The return value from this function, OK by default*/ /*Retrieve protocol number from /etc/protocols file */ protocol=getprotobyname("tcp"); if (protocol == NULL) { syslog(LOG_ERR, "cannot map \"tcp\" to protocol number: %m"); - goto EndwithError; + goto err; } bzero((char*) &addr_server, sizeof(addr_server)); addr_server.sin_family = AF_INET; if (inet_pton(AF_INET, address, &addr_server.sin_addr.s_addr) <= 0) { syslog (LOG_ERR, "error inet_pton: %m"); - goto EndwithError; + goto err; } addr_server.sin_port = htons(port); if ((sockfd = socket(AF_INET, SOCK_STREAM, protocol->p_proto)) < 0) { syslog (LOG_ERR, "socket creation failed: %m"); - goto EndandClosewithError; + goto err; } - /*We want to avoid issues while trying to bind a socket in TIME_WAIT state.*/ - /*In this application from the client there should not be any trouble if it gets*/ - /*TIME_WAIT states, because it can connect from any source port.*/ + /*We want to avoid issues while trying to bind a socket in TIME_WAIT state*/ optval = 1; if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) { syslog (LOG_ERR, "setsockopt failed: %m"); - goto EndandClosewithError; + goto err; } if (bind(sockfd, (struct sockaddr *) &addr_server, sizeof(addr_server)) < 0) { syslog (LOG_ERR, "socket bind failed: %m"); - goto EndandClosewithError; + goto err; } if (listen (sockfd, queue) < 0 ) { syslog (LOG_ERR, "socket listen failed: %m"); - goto EndandClosewithError; + goto err; } while(1) { clilen = sizeof(addr_client); - if ((sockclient = accept (sockfd, (struct sockaddr *) &addr_client, &clilen)) < 0) { + if ((sockclient = TEMP_FAILURE_RETRY(accept (sockfd, (struct sockaddr *) &addr_client, &clilen))) < 0) { syslog (LOG_ERR, "socket accept failed: %m"); - goto EndandClosewithError; + goto err; } + if (pthread_create (&idThread, NULL, serverThread, (void *)sockclient) != 0 ) { syslog (LOG_ERR, "thread creation failed: %m"); - } + } } - - - EndasUsual: - return returnValue; - EndandClosewithError: - close (sockfd); - EndwithError: - /*When there is an error.*/ - returnValue = -1; - goto EndasUsual; + +end: + closeSafely (sockfd); + return returnValue; +err: + /*When there is an error.*/ + returnValue = -1; + goto end; } -int daemonize(const char *pname, int facility, int option) { - int fd; + +int daemonize(const char *pname, int facility, int option) +{ + int fd = -1; /*Temporaly store for the /dev/tty and /dev/null file descriptors*/ - if ( (fd = open( "/dev/tty", O_RDWR, 0) ) == -1) { + if ((fd = TEMP_FAILURE_RETRY(open( "/dev/tty", O_RDWR, 0))) == -1) { /*We already have no tty control*/ - close(fd); + closeSafely(fd); return 0; } /*Sending messages to log*/ openlog(pname, option, facility); + /*To get a controlling tty*/ if (ioctl(fd, TIOCNOTTY, (caddr_t)0) <0 ) { syslog (LOG_ERR, "Getting tty failed: %m"); return -1; } - if (close(fd) < 0) { + + if (closeSafely(fd) < 0) { syslog (LOG_ERR, "Closing tty failed: %m"); return -1; } - if((fd = open( "/dev/null", O_RDWR, 0) ) == -1) { - close(fd); + if((fd = TEMP_FAILURE_RETRY(open( "/dev/null", O_RDWR, 0))) == -1) { + closeSafely(fd); return -1; } - dup2(fd,0); - dup2(fd,1); - dup2(fd,2); - close(fd); - - return 0; -} - -void *serverThread (void * arg) { - int sock; - /*Variables used for the select function*/ - struct timeval ptime; - fd_set fd_read; - int ret; - - /*Control parameters used while receiving data from the client*/ - int nLeft, nData, iPos; - /*This buffer is intended to store the data received from the client.*/ - char buffer[1025]; + if (TEMP_FAILURE_RETRY(dup2(fd,0)) < 0 || + TEMP_FAILURE_RETRY(dup2(fd,1)) < 0 || + TEMP_FAILURE_RETRY(dup2(fd,2)) < 0) { + closeSafely(fd); + return -1; + } - /*The command sent by the client, to be executed by this process*/ - char *command; - /*Store the command length*/ - uint32_t *commandLength; - /*Store the ack*/ - uint32_t *ack; + closeSafely(fd); + return 0; +} - sock = (int) arg; +void *serverThread (void * arg) +{ + int socket = -1; /*Open socket by the Java client*/ + struct timeval ptime; /*Variable used by the select function, timeout control*/ + int nLeft; /*Control parameter used while receiving data from the client*/ + char buffer[1025]; /*This buffer is intended to store the data received from the client*/ + char *command = NULL; /*The command sent by the client, to be executed by this process*/ + uint32_t *commandLength = NULL; /*Store the command length*/ + + socket = (int) arg; + pthread_detach(pthread_self()); - - /******************************************************************************************/ - /* Just over 1 TCP connection */ - /* COMMAND_LENGTH: Java integer 4 bytes, BIG-ENDIAN (the same as network order) */ - /* ACK: integer 4 bytes big-endian (for Java) with the received comand length */ - /* COMMAND: TPV locale character set encoding */ - /* RESULTS: TPV locale character set encoding */ - /* */ - /* JAVA CLIENT: ------------ COMMAND_LENGTH -------> :SERVER */ - /* JAVA CLIENT: <---------------- ACK -------------- :SERVER */ - /* JAVA CLIENT: -------------- COMMAND ------------> :SERVER */ - /* JAVA CLIENT: <-------------- RESULTS ------------ :SERVER */ - /* JAVA CLIENT: <---------- CLOSE CONNECTION ------- :SERVER */ - /* */ - /******************************************************************************************/ - - - /*Wait 2 seconds for data coming from client.*/ - ptime.tv_sec=2; - ptime.tv_usec=0; + if (required_sock_options (socket) < 0) + goto err; + + + /************************************************************************************************************/ + /* Just over 1 TCP connection */ + /* COMMAND_LENGTH: Java integer 4 bytes, BIG-ENDIAN (the same as network order) */ + /* COMMAND: locale character set encoding */ + /* RESULTS: locale character set encoding */ + /* */ + /* JAVA CLIENT: ------------ COMMAND_LENGTH -------> :SERVER */ + /* JAVA CLIENT: -------------- COMMAND ------------> :SERVER */ + /* JAVA CLIENT: <-------------- RESULTS ------------ :SERVER */ + /* JAVA CLIENT: <---------- CLOSE CONNECTION ------- :SERVER */ + /* */ + /************************************************************************************************************/ /*COMMAND LENGTH*/ /*First of all we receive the command size as a Java integer (4 bytes primitive type)*/ if ((commandLength = (uint32_t *) malloc(sizeof(uint32_t))) == NULL) { syslog (LOG_ERR, "commandLength malloc failed: %m"); - goto Error; + goto err; } + bzero(buffer, sizeof(buffer)); nLeft = sizeof(uint32_t); - nData = iPos = 0; - do { - FD_ZERO(&fd_read); - FD_SET(sock, &fd_read); - ret = select(sock+1, &fd_read,NULL,NULL, &ptime); - if(ret == 0) { - syslog(LOG_INFO, "receiving command length timeout error"); - goto Error; - } - if(ret == -1) { - syslog(LOG_ERR, "receiving command length select error: %m"); - goto Error; - } - if((nData = recv(sock, &buffer[iPos], nLeft, 0)) <= 0) { - syslog (LOG_ERR, "command length reception failed: %m"); - goto Error; - } - nLeft -= nData; - iPos += nData; - } while (nLeft > 0); - if (nLeft < 0) { + /*Wait max 2 seconds for data coming from client, otherwise exits with error.*/ + ptime.tv_sec=2; + ptime.tv_usec=0; + if (receive_from_socket (socket, buffer, &nLeft, &ptime) < 0) + goto err; + + if (nLeft < 0) syslog (LOG_INFO, "command length excessive data received, expected: 4, received: %d", (-nLeft + sizeof(uint32_t)) ); - } /*Retrieve integer (4 bytes) from buffer*/ memcpy (commandLength, buffer, sizeof(uint32_t)); @@ -308,199 +290,314 @@ void *serverThread (void * arg) { *commandLength = be32toh (*commandLength); - - /*ACK*/ - /*Sending back the command length as the ACK message*/ - if ((ack = (uint32_t *) malloc(sizeof(uint32_t))) == NULL) { - syslog (LOG_ERR, "commandLength malloc failed: %m"); - goto ErrorAck; - } - *ack = htobe32(*commandLength); - if (send(sock, ack, sizeof(uint32_t), 0) < 0) { - syslog (LOG_ERR, "send ACK failed: %m"); - goto ErrorAck; - } - - /*COMMAND*/ /*Reserving commandLength + 1 because of the string end character*/ if ((command = (char *) malloc(*commandLength + 1)) == NULL) { syslog (LOG_ERR, "command malloc failed: %m"); - goto ErrorCommand; + goto err; } + bzero(command, ((*commandLength) + 1)); - nLeft = *commandLength; - nData = iPos = 0; - do { - FD_ZERO(&fd_read); - FD_SET(sock, &fd_read); - ret = select(sock+1, &fd_read,NULL,NULL, &ptime); - if(ret == 0) { - syslog(LOG_INFO, "receiving command timeout error"); - goto ErrorCommand; - } - if(ret == -1) { - syslog(LOG_ERR, "receiving command select error: %m"); - goto ErrorCommand; - } - if((nData = recv(sock, &command[iPos], nLeft, 0)) <= 0) { - syslog (LOG_ERR, "command reception failed: %m"); - goto ErrorCommand; - } - nLeft -= nData; - iPos += nData; - } while (nLeft > 0); - if (nLeft < 0) { - syslog (LOG_INFO, "execessive command length, expected: %d, received: %d", *commandLength, (-nLeft + *commandLength)); - } + nLeft = *commandLength; + /*Wait max 2 seconds for data coming from client, otherwise exits with error.*/ + ptime.tv_sec=2; + ptime.tv_usec=0; + if (receive_from_socket (socket, command, &nLeft, &ptime) < 0) + goto err; + + if (nLeft < 0) + syslog (LOG_INFO, "execessive command length, expected: %d, received: %d", *commandLength, (-nLeft + *commandLength)); /*RESULTS*/ - fork_system(sock, command); + pre_fork_system(socket, command); /*CLOSE CONNECTION AND FINISH*/ - -ErrorCommand: +err: free(command); -ErrorAck: - free(ack); -Error: - close(sock); + closeSafely(socket); free(commandLength); pthread_exit(0); } +int required_sock_options (int socket) +{ + int optval, flags; + + /*We want non blocking sockets.*/ + /*See the discussion of spurious readiness notifications under the BUGS section of select(2) */ + if ((flags = TEMP_FAILURE_RETRY(fcntl(socket,F_GETFL,0))) < 0) { + syslog (LOG_ERR, "read socket status flags failed: %m"); + return -1; + } + + if(TEMP_FAILURE_RETRY(fcntl(socket, F_SETFL, O_NONBLOCK|flags)) < 0){ + syslog (LOG_ERR, "set socket status flags failed: %m"); + return -1; + } + + /*Portable programs should not rely on inheritance or noninheritance of file status flags and */ + /*always explicitly set all required flags*/ + optval = 1; + if(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) { + syslog (LOG_ERR, "setsockopt SO_REUSEADDR failed: %m"); + return -1; + } + + /*Enable keepalive for this socket*/ + optval = 1; + if(setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) { + syslog (LOG_ERR, "setsockopt SO_KEEPALIVE failed: %m"); + return -1; + } + + /*TODO: keepalive is not enough to find out if the connection is broken */ + /* apparently it just works while the handshake phase. See: */ + /* http://stackoverflow.com/questions/4345415/socket-detect-connection-is-lost */ + /* I have to implement an echo/ping messages protocol (application layer) */ + /* In the meanwhile this application should just be used in the localhost interface :( */ + + return 0; +} -int fork_system(int socket, char *command) { - int pid; - int out[2], err[2]; - char buf[2000]; - char string[100]; - struct pollfd polls[2]; - int returnstatus; - int n; - int childreturnstatus; - int returnValue = -1; /*NOK by default*/ +int timeout (int fd, long timeout) +{ + struct timeval ptime; + fd_set fd_read; + + ptime.tv_sec = timeout; + ptime.tv_usec = 0; + for (;;) { + + } +} + +int receive_from_socket (int socket, char *store, int *nLeft, struct timeval *ptime) +{ + int nData, iPos; /*Control variables.*/ + int ret; /*Store return value from select.*/ + fd_set fd_read; /*Values for select function.*/ + + nData = iPos = 0; + do { + FD_ZERO(&fd_read); + FD_SET(socket, &fd_read); + ret = select(socket+1, &fd_read, NULL, NULL, ptime); + if(ret == 0) { + syslog(LOG_INFO, "receiving timeout error"); + return -1; + } + if(ret == -1) { + syslog(LOG_ERR, "receiving select error: %m"); + return -1; + } + if((nData = recv(socket, &store[iPos], *nLeft, 0)) <= 0) { + syslog (LOG_ERR, "read TCP socket failed: %m"); + return -1; + } + *nLeft -= nData; + iPos += nData; + } while (*nLeft > 0); + + return 0; +} + + +int pre_fork_system(int socket, char *command) +{ /*Required variables in order to share memory between processes*/ key_t keyvalue; - int idreturnst = -1; + int idreturnstatus = -1; /*Store the return status from the process launched using system or execve*/ /*Using shared memory between the child and parent process*/ - int *returnst = NULL; + int *returnstatus = NULL; + /*Required variables in order to share the semaphore between processes*/ key_t keysemaphore; int idsemaphore = -1; - /*Used like a barrier: the child process just can start after sending the XML init code*/ - sem_t *semaphore = NULL; + sem_t *semaphore = NULL; /*Used as a barrier: the child process just can start after sending the XML init code*/ + + int returnValue = -1; /*Return value from this function can be caught by upper layers, NOK by default*/ + - out[0] = out[1] = err[0] = err[1] = -1; + /*Allocate shared memory because we can not use named semaphores*/ + /*We are using this semaphore as a barrier, because we just want to start the child process when the parent process has sent*/ + /*the XML header (see: for_system function)*/ + + keysemaphore=ftok("/bin/ls", SHAREMEMSEM); /*the /bin/ls must exist otherwise this does not work... */ + if (keysemaphore == -1) { + syslog (LOG_ERR, "ftok failed: %m"); + goto End; + } + /*Attach shared memory*/ + if ((idsemaphore = shmget(keysemaphore,sizeof(sem_t), 0660 | IPC_CREAT)) < 0) { + syslog (LOG_ERR, "semaphore initialization failed: %m"); + goto EndandReleaseSem; + } - /*Creating the pipes, they will be attached to the stderr and stdout streams*/ - if (pipe(out) < 0 || pipe(err) < 0) { - syslog (LOG_ERR, "pipe failed: %m"); - goto EndandClosePipes; - } + if ((semaphore = (sem_t *)shmat(idsemaphore, (void *)0, 0)) < 0) { + goto EndandReleaseSem; + } + if (sem_init(semaphore, 1, 1) < 0) { + syslog (LOG_ERR, "semaphore initialization failed: %m"); + goto EndandDestroySem; + } + if (sem_wait(semaphore) < 0) { + syslog (LOG_ERR, "semaphore wait failed: %m"); + goto EndandDestroySem; + } - /*Allocate shared memory because we can not use named semaphores*/ - keysemaphore=ftok("/bin/ls", SHAREMEMSEM); /*the /bin/ls must exist otherwise this does not work... */ - if (keysemaphore == -1) { - syslog (LOG_ERR, "ftok failed: %m"); - goto EndandClosePipes; - } - /*Attach shared memory*/ - if ((idsemaphore = shmget(keysemaphore,sizeof(sem_t), 0660 | IPC_CREAT)) < 0) { - syslog (LOG_ERR, "semaphore initialization failed: %m"); - goto EndandReleaseSem; - } - if ((semaphore = (sem_t *)shmat(idsemaphore, (void *)0, 0)) < 0) { - goto EndandReleaseSem; - } + + + /*Allocate shared memory for the return status code from the process which is going to be launched by the system function.*/ + /*We want to share the returnstatus variable between this process and the child that is going to be created in the fork_system method.*/ + /*The goal is to store in this variable the return status code received from the process launched with the system method by the child process,*/ + /*then the parent process can retrieve that return status code and send it by TCP to the Java client.*/ + /*There are not concurrency issues because the parent process will just try to read this variable when the child process is dead, taking*/ + /*in that moment its last value and sending it to the Java client.*/ - if (sem_init(semaphore, 1, 1) < 0) { - syslog (LOG_ERR, "semaphore initialization failed: %m"); - goto EndandDestroySem; - } + + keyvalue=ftok("/bin/ls", SHAREMEMKEY); /*the /bin/ls must exist otherwise this does not work... */ + if (keyvalue == -1) { + syslog (LOG_ERR, "ftok failed: %m"); + goto EndandDestroySem; + } + + /*Attach shared memory*/ + if ((idreturnstatus=shmget(keyvalue,sizeof(int), 0660 | IPC_CREAT)) < 0) { + syslog (LOG_ERR, "shmget failed: %m"); + goto EndandReleaseMem; + } + + returnstatus = (int *)shmat(idreturnstatus, (void *)0, 0); + if ((*returnstatus)== -1) { + syslog (LOG_ERR, "shmat failed: %m"); + goto EndandReleaseMem; + } + + + /*After allocating and attaching shared memory we reach this code if everything went OK.*/ - if (sem_wait(semaphore) < 0) { - syslog (LOG_ERR, "semaphore wait failed: %m"); - goto EndandDestroySem; - } + returnValue = fork_system(socket, command, semaphore, returnstatus); +EndandReleaseMem: + if (returnstatus != NULL) { + /*detach memory*/ + if (shmdt ((int *)returnstatus) < 0) + syslog (LOG_ERR, "returnstatus shared variable shmdt failed: %m"); + } + + /*Mark the segment to be destroyed.*/ + if (shmctl (idreturnstatus, IPC_RMID, (struct shmid_ds *)NULL) < 0 ) + syslog (LOG_ERR, "returnstatus shared variable shmctl failed: %m"); +EndandDestroySem: + if (sem_destroy(semaphore) <0) + syslog (LOG_ERR, "semaphore destroy failed: %m"); +EndandReleaseSem: + /*after sem_destroy-> input/output parameter NULL?*/ + if (semaphore != NULL) { + /*detach memory*/ + if (shmdt ((sem_t *)semaphore) < 0) + syslog (LOG_ERR, "semaphore shmdt failed: %m"); + } + + /*Mark the segment to be destroyed.*/ + if (shmctl (idsemaphore, IPC_RMID, (struct shmid_ds *)NULL) < 0 ) + syslog (LOG_ERR, "semaphore shmctl failed: %m"); +End: + return returnValue; +} + + + +int fork_system(int socket, char *command, sem_t *semaphore, int *returnstatus) { + int pid; + int out[2], err[2]; /*Store pipes file descriptors. Write ends attached to the stdout and stderr streams.*/ + char buf[2000]; /*Read data buffer.*/ + char string[100]; + /*We are going to use a poll in order to find out if there are data coming from the*/ + /*pipes attached to the stdout and stderr streams.*/ + struct pollfd polls[2]; + int n; + int childreturnstatus; + int returnValue = -1; /*eturn value from this function can be caught by upper layers, NOK by default*/ - /*Allocate shared memory for the return status code */ - keyvalue=ftok("/bin/ls", SHAREMEMKEY); /*the /bin/ls must exist otherwise this does not work... */ - if (keyvalue == -1) { - syslog (LOG_ERR, "ftok failed: %m"); - goto EndandDestroySem; - } - /*Attach shared memory*/ - if ((idreturnst=shmget(keyvalue,sizeof(int), 0660 | IPC_CREAT)) < 0) { - syslog (LOG_ERR, "shmget failed: %m"); - goto EndandReleaseMem; - } - returnst = (int *)shmat(idreturnst, (void *)0, 0); - if ((*returnst)== -1) { - syslog (LOG_ERR, "shmat failed: %m"); - goto EndandReleaseMem; - } - /*By default*/ - (*returnst) = 0; + /*Value by default*/ + (*returnstatus) = 0; + + + out[0] = out[1] = err[0] = err[1] = -1; + + /*Creating the pipes, they will be attached to the stderr and stdout streams*/ + if (pipe(out) < 0 || pipe(err) < 0) { + syslog (LOG_ERR, "pipe failed: %m"); + goto EndandClosePipes; + } + if ((pid=fork()) == -1) { syslog (LOG_ERR, "fork failed: %m"); - goto EndandReleaseMem; + goto EndandClosePipes; } if (pid == 0) { /*Child process*/ + /*It has to launch another one using system or execve*/ if ((dup2(out[1],1) < 0) || (dup2(err[1],2) < 0)) { - syslog (LOG_ERR, "child dup2 failed: %m"); + syslog (LOG_ERR, "child dup2 failed: %m"); + /*Going to zombie state, hopefully waitpid will catch it*/ exit(-1); } if (sem_wait(semaphore) < 0) { syslog (LOG_ERR, "child semaphore wait failed: %m"); + /*Going to zombie state, hopefully waitpid will catch it*/ exit(-1); } - returnstatus=system(command); + *returnstatus=system(command); if (WIFEXITED(returnstatus) == 1) - (*returnst) = WEXITSTATUS(returnstatus); + (*returnstatus) = WEXITSTATUS(*returnstatus); else - (*returnst) = -1; + (*returnstatus) = -1; exit(0); } else { /*Parent process*/ + /*It sends data to the Java client using a TCP connection.*/ polls[0].fd=out[0]; polls[1].fd=err[0]; polls[0].events=POLLIN; polls[1].events=POLLIN; sprintf(string,""); send(socket,string,strlen(string),0); - sprintf(string,""); + sprintf(string,""); send(socket,string,strlen(string),0); /*Releasing barrier, the child process can keep running*/ if (sem_post(semaphore) < 0 ) { syslog (LOG_ERR, "parent error releasing barrier: %m"); - /*Beaware, sig_handler with the child process :]*/ + /*TODO: May I kill a child process if it is already dead? I mean,*/ + /* what could happen if the child process is dead?*/ + /* Should I implement a SIGCHILD handler?*/ + /*TODO: Should I have a SIGTERM handler in the child process?*/ kill(pid, SIGTERM); - goto EndandReleaseMem; + goto EndandClosePipes; } + while(1) { if(poll(polls,2,100)) { if(polls[0].revents&&POLLIN) { @@ -523,10 +620,15 @@ int fork_system(int socket, char *command) { } if(!polls[0].revents&&POLLIN && !polls[1].revents&&POLLIN) { syslog (LOG_ERR, "parent error polling pipes: %m"); - /*Beaware, sig_handler with the child process :]*/ + /*TODO: May I kill a child process if it is already dead? I mean,*/ + /* what could happen if the child process is dead?*/ + /* Should I implement a SIGCHILD handler?*/ + /*TODO: Should I have a SIGTERM handler in the child process?*/ kill(pid, SIGTERM); /*I want to send an error status to the remote calling process*/ - (*returnst) = -1; + /*TODO: Before giving this value I should make sure the child process is dead*/ + /* otherwise I could finish having in *returnstatus the value from the child process*/ + (*returnstatus) = -1; break; } } @@ -537,68 +639,46 @@ int fork_system(int socket, char *command) { /*First of all, we check the exit status of our child process*/ /*In case of error send an error status to the remote calling process*/ if (WIFEXITED(childreturnstatus) != 1) - (*returnst) = -1; + (*returnstatus) = -1; break; } /*The child process is not dead, keep polling more data from stdout or stderr streams*/ } } + } /*Reaching this code when child finished or if error while polling pipes*/ - sprintf(string,"", (*returnst)); + sprintf(string,"", (*returnstatus)); send(socket,string,strlen(string),0); - sprintf(string,""); + sprintf(string,""); send(socket,string,strlen(string),0); /*Stuff just done by the Parent process. The child process ends with exit*/ returnValue = 0; /*if everything went OK*/ -EndandReleaseMem: - if (returnst != NULL) { - /*detach memory*/ - if (shmdt ((int *)returnst) < 0) - syslog (LOG_ERR, "returnstatus shared variable shmdt failed: %m"); - } - /*Mark the segment to be destroyed.*/ - if (shmctl (idreturnst, IPC_RMID, (struct shmid_ds *)NULL) < 0 ) - syslog (LOG_ERR, "returnstatus shared variable shmctl failed: %m"); -EndandDestroySem: - if (sem_destroy(semaphore) <0) - syslog (LOG_ERR, "semaphore destroy failed: %m"); -EndandReleaseSem: - /*after sem_destroy-> input/output parameter NULL?*/ - if (semaphore != NULL) { - /*detach memory*/ - if (shmdt ((sem_t *)semaphore) < 0) - syslog (LOG_ERR, "semaphore shmdt failed: %m"); - } - /*Mark the segment to be destroyed.*/ - if (shmctl (idsemaphore, IPC_RMID, (struct shmid_ds *)NULL) < 0 ) - syslog (LOG_ERR, "semaphore shmctl failed: %m"); + EndandClosePipes: - closeSafely (out[0]); - closeSafely (out[1]); - closeSafely (err[0]); - closeSafely (err[1]); + closeSafely (out[0]); + closeSafely (out[1]); + closeSafely (err[0]); + closeSafely (err[1]); return returnValue; } -void sigterm_handler(int sig) +void sigint_handler(int sig) { - if (daemonPID != getpid()) { - //Do nothing - return; - } - if (signal (SIGTERM, SIG_IGN) == SIG_ERR) - { - syslog (LOG_ERR, "signal desactivation failed"); + if (daemonPID != getpid()) { + //Do nothing + return; } + if (signal (SIGINT, SIG_IGN) == SIG_ERR) + syslog (LOG_ERR, "signal SIGINT desactivation failed: %m"); - close (sockfd); - /*TODO: kill child processes and release allocate memory*/ - exit (0); + closeSafely (sockfd); + /*TODO: kill child processes and release allocate memory*/ + exit (0); } diff --git a/JavaFork/Daemon/javafork.h b/JavaFork/Daemon/javafork.h old mode 100644 new mode 100755 index aacdc18..d8fbf14 --- a/JavaFork/Daemon/javafork.h +++ b/JavaFork/Daemon/javafork.h @@ -9,6 +9,7 @@ + /**************************************************************************************** * This method is used by pthread_create * * * @@ -47,7 +48,17 @@ int main_child (char *address, int port, int queue); * INPUT PARAMETER: socket file descriptor * * RETURNS: void * ****************************************************************************************/ -int fork_system(int socket, char *command); +int fork_system(int socket, char *command, sem_t *semaphore, int *returnst); + + +/**************************************************************************************** +* This method is used by pthread_create * +* * +* INPUT PARAMETER: socket file descriptor * +* RETURNS: void * +****************************************************************************************/ +int pre_fork_system(int socket, char *command); + @@ -58,4 +69,9 @@ int fork_system(int socket, char *command); * INPUT PARAMETER: socket file descriptor * * RETURNS: void * ****************************************************************************************/ -void sigterm_handler(); +void sigint_handler(); + + + +int required_sock_options (int socket); +int receive_from_socket (int socket, char *store, int *Left, struct timeval *ptime); diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java index 9b4180b..ac0a312 100644 --- a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java +++ b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java @@ -22,12 +22,12 @@ public class RemoteForkMain { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final PrintStream stdout = new PrintStream(baos); - final String command = "ls -lah"; + final String command = "ls -lah ~/.ssh; ls -lah * bbbb aaa"; final ByteArrayOutputStream baos2 = new ByteArrayOutputStream(); final PrintStream stderr = new PrintStream(baos2); int result; - result = LauncherProcessesDiaFork.exec(command,stdout, stderr, "127.0.0.1", 5193); + result = LauncherProcesses.exec(command,stdout, stderr, "gumartinm.name", 5193); System.out.println(result); System.out.println("Stdout: " + baos.toString()); System.out.println("Stderr: " + baos2.toString()); diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java index 0ecb257..89141c5 100644 --- a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java +++ b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java @@ -7,6 +7,7 @@ import java.io.PrintWriter; import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; +import org.xml.sax.InputSource; import org.xml.sax.SAXException; /** @@ -94,12 +95,10 @@ public class TCPForkDaemon { /******************************************************************************************/ /* Just over 1 TCP connection */ /* COMMAND_LENGTH: Java integer 4 bytes, BIG-ENDIAN (the same as network order) */ - /* ACK: integer 4 bytes big-endian (for Java) with the sent comand length */ - /* COMMAND: TPV locale character set encoding */ - /* RESULTS: TPV locale character set encoding */ + /* COMMAND: remote locale character set encoding */ + /* RESULTS: remote locale character set encoding */ /* */ /* JAVA CLIENT: ------------ COMMAND_LENGTH -------> :SERVER */ - /* JAVA CLIENT: <---------------- ACK -------------- :SERVER */ /* JAVA CLIENT: -------------- COMMAND ------------> :SERVER */ /* JAVA CLIENT: <-------------- RESULTS ------------ :SERVER */ /* JAVA CLIENT: <---------- CLOSE CONNECTION ------- :SERVER */ @@ -110,35 +109,48 @@ public class TCPForkDaemon { socket = new Socket(InetAddress.getByName(host), port); try { - /*Must be used the remote charset :S*/ + //By default in UNIX systems the keepalive message is sent after 20hours + //with Java we can not use the TCP_KEEPCNT, TCP_KEEPIDLE and TCP_KEEPINTVL options by session. + //It is up to the server administrator and client user to configure them. + //I guess it is because Solaris does not implement those options... + //see: Net.c openjdk 6 and net_util_md.c openjdk 7 + //So in Java applications the only way to find out if the connection is broken (one router down) + //is sending ping messages or something similar from the application layer. Java is a toy language... + //Anyway I think the keepalive messages just work during the handshake phase, just after sending some + //data over the link the keepalive does not work. + // See: http://stackoverflow.com/questions/4345415/socket-detect-connection-is-lost + socket.setKeepAlive(true); + + //It must be used the remote locale character set encoding byte [] commandEncoded = command.getBytes("UTF-8"); DataOutputStream sendData = new DataOutputStream(socket.getOutputStream()); - DataInputStream receiveData = new DataInputStream(socket.getInputStream()); // 1. COMMAND_LENGTH sendData.writeInt(commandEncoded.length); - - // 2. ACK - // TODO: if the server close the connection we could stay here probably - // until TCP keepalive is sent (20 hours by default in Linux) - int ack = receiveData.readInt(); - if (ack != commandEncoded.length) - throw new IOException("invalid ACK, something went wrong " + - "with the TCPForkDaemon. Check the /var/log/messages file in the TPV"); - - - // 3. COMMAND + + // 2. COMMAND sendData.write(commandEncoded); - - // 4. RESULTS - // TODO: if the server closes the connection we could stay here probably - // until TCP keepalive is sent (20 hours by default in Linux) + // 3. RESULTS + // TODO: When the network infrastructure (between client and server) fails in this point + // (broken router for example) Could we stay here until TCP keepalive is sent? + // (20 hours by default in Linux) + // Impossible to use a timeout, because we do not know how much time is going to long the command :/ + // the only way to fix this issue in Java is sending ping messages (Could we fix it using custom settings in the OS + // of the client and server machines? for example in Linux see /proc/sys/net/ipv4/) + InputSource inputSource = new InputSource(socket.getInputStream()); + //Must be used the remote locale character set encoding? + inputSource.setEncoding("UTF-8"); parser.setStream(socket.getInputStream()); + DataInputStream receiveData = new DataInputStream(socket.getInputStream()); + + + // 2. ACK + int ack = receiveData.readInt(); - // 5. SERVER CLOSES CONNECTION + // 4. SERVER CLOSED CONNECTION } finally { if (out != null) { diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java index 51fe7f0..255cb0f 100644 --- a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java +++ b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java @@ -15,7 +15,7 @@ import org.xml.sax.ext.DefaultHandler2; *

* Class intended to parse the XML stream received from the daemon which is * waiting to run commands. These commands are sent by the method - * {@link es.dia.pos.n2a.util.os.unix.TCPForkDaemon#exec(String, String, int)} + * {@link de.fork.java.TCPForkDaemon#exec(String, String, int)} *

*

* After processing one command the daemon sends a XML stream with the stderr, -- 2.1.4