From: gumartinm Date: Thu, 26 Jan 2012 23:49:59 +0000 (+0100) Subject: Many improvements in my remote TCP fork for Java X-Git-Url: https://git.gumartinm.name/?a=commitdiff_plain;h=b4a15eb6f69df019ba074f39da3c07b08509c34e;p=JavaForFun Many improvements in my remote TCP fork for Java I keep working on it Next steps cleaning up the code and maybe using execve instead of system. --- diff --git a/JavaFork/Daemon/Makefile b/JavaFork/Daemon/Makefile new file mode 100644 index 0000000..7c28c45 --- /dev/null +++ b/JavaFork/Daemon/Makefile @@ -0,0 +1,8 @@ +all: javafork + +javafork: javafork.c javafork.h + gcc -Wall -g -o javafork javafork.c -lpthread + +clean: + rm -f javafork + diff --git a/JavaFork/Daemon/javafork.c b/JavaFork/Daemon/javafork.c index 3ad2346..366a72b 100644 --- a/JavaFork/Daemon/javafork.c +++ b/JavaFork/Daemon/javafork.c @@ -18,358 +18,587 @@ #include #include #include +#include +#include +#include #include "javafork.h" -pid_t currentPID; + +pid_t daemonPID; int sockfd; +#define RESTARTABLE(_cmd, _result) do { \ + _result = _cmd; \ +} while((_result == -1) && (errno == EINTR)) -int main (int argc, char *argv[]) { - pid_t pid; +static int restartableClose(int fd) { + int err; + RESTARTABLE(close(fd), err); + return err; +} - if (signal(SIGTERM, sigterm_handler) == SIG_ERR) - { - perror ("\nerror signal handler"); - return -1; - } +static int closeSafely(int fd) { + return (fd == -1) ? 0 : restartableClose(fd); +} - if ((pid = fork()) == -1) { - perror("fork: "); - return -1; + +int main (int argc, char *argv[]) { + int c; /*Getopt parameter*/ + /*Default values*/ + char *avalue = IPADDRESS; /*Address: numeric value or hostname*/ + int pvalue = PORT; /*TCP port*/ + int qvalue = QUEUE; /*TCP listen queue*/ + + + /*This process is intended to be used as a daemon, it sould be launched by the INIT process, because of that*/ + /*we are not forking it (INIT needs it)*/ + if (daemonize(argv[0], LOG_SYSLOG, LOG_PID) < 0) + return 1; + + setsid(); + + if (signal(SIGPIPE,SIG_IGN) == SIG_ERR) { + syslog (LOG_ERR, "Error with SIGPIPE: %m"); + return 1; } - else { - if (pid == 0) { - /*child process*/ - currentPID = getpid(); - main_child(argc, argv); + + + opterr = 0; + while ((c = getopt (argc, argv, "a:p:q:")) != -1) { + switch (c) { + case 'a': + avalue = optarg; + break; + case 'p': + pvalue = atoi(optarg); + if ((pvalue > 65535) || (pvalue <= 0)) { + syslog (LOG_ERR, "Port value %d out of range", pvalue); + return 1; + } + break; + case 'q': + qvalue = atoi(optarg); + break; + case '?': + if ((optopt == 'a') || (optopt == 'p') || (optopt == 'q')) + syslog (LOG_ERR, "Option -%c requires an argument.", optopt); + else if (isprint (optopt)) + syslog (LOG_ERR, "Invalid option '-%c'.", optopt); + else + syslog (LOG_ERR, "Unknown option character '\\x%x'.", optopt); + return 1; + default: + abort (); } } - + /*This program does not admit options*/ + if (optind < argc) { + syslog (LOG_ERR,"This program does not admit options just argument elements with their values."); + return 1; + } + + + /*TODO: INIT process sending SIGTERM? Should I catch that signal?*/ + daemonPID = getpid(); + if (signal(SIGTERM, sigterm_handler) == SIG_ERR) { + syslog (LOG_ERR, "SIGTERM signal handler failed: %m"); + return 1; + } + + + if (main_child(avalue, pvalue, qvalue) < 0 ) + return 1; + return 0; } -int main_child (int argc, char *argv[]) { - struct protoent *protocol; - struct sockaddr_in addr_server; - struct sockaddr_in addr_client; - int sockclient; - pthread_t idThread; - int clilen; - +int main_child (char *address, int port, int queue) { + struct protoent *protocol; /*Network protocol*/ + struct sockaddr_in addr_server; /*struct with the server socket address*/ + struct sockaddr_in addr_client;/*struct with the client socket address*/ + int sockclient; /*File descriptor for the accepted socket*/ + pthread_t idThread; /*Thread identifier number*/ + socklen_t clilen; + int optval; + int returnValue = 0; /*OK by default*/ + - daemonize(argv[0], LOG_SYSLOG, LOG_PID); - setsid(); - - signal(SIGPIPE,SIG_IGN); - /*Retrieve protocol number from /etc/protocols file */ - protocol=getprotobyname("tcp"); + protocol=getprotobyname("tcp"); if (protocol == NULL) { - syslog(LOG_ERR, "cannot map \"tcp\" to protocol number"); - exit (1); + syslog(LOG_ERR, "cannot map \"tcp\" to protocol number: %m"); + goto EndwithError; } - + bzero((char*) &addr_server, sizeof(addr_server)); addr_server.sin_family = AF_INET; - if (inet_pton(AF_INET, argv[1], &addr_server.sin_addr.s_addr) <= 0) - syslog (LOG_ERR, "error inet_pton"); + if (inet_pton(AF_INET, address, &addr_server.sin_addr.s_addr) <= 0) { + syslog (LOG_ERR, "error inet_pton: %m"); + goto EndwithError; + } + + addr_server.sin_port = htons(port); + + if ((sockfd = socket(AF_INET, SOCK_STREAM, protocol->p_proto)) < 0) { + syslog (LOG_ERR, "socket creation failed: %m"); + goto EndandClosewithError; + } - addr_server.sin_port = htons(atol(argv[2])); - if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - syslog (LOG_ERR, "socket creation failed"); - exit(1); + /*We want to avoid issues while trying to bind a socket in TIME_WAIT state.*/ + /*In this application from the client there should not be any trouble if it gets*/ + /*TIME_WAIT states, because it can connect from any source port.*/ + optval = 1; + if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) { + syslog (LOG_ERR, "setsockopt failed: %m"); + goto EndandClosewithError; } - + if (bind(sockfd, (struct sockaddr *) &addr_server, sizeof(addr_server)) < 0) { - syslog (LOG_ERR, "socket bind failed"); - exit(1); + syslog (LOG_ERR, "socket bind failed: %m"); + goto EndandClosewithError; } - - if (listen (sockfd, atoi(argv[3])) < 0 ) { - syslog (LOG_ERR, "socket listen failed"); - exit(1); + + if (listen (sockfd, queue) < 0 ) { + syslog (LOG_ERR, "socket listen failed: %m"); + goto EndandClosewithError; } - + while(1) { clilen = sizeof(addr_client); if ((sockclient = accept (sockfd, (struct sockaddr *) &addr_client, &clilen)) < 0) { - syslog (LOG_ERR, "socket accept failed"); - exit(1); + syslog (LOG_ERR, "socket accept failed: %m"); + goto EndandClosewithError; } if (pthread_create (&idThread, NULL, serverThread, (void *)sockclient) != 0 ) { - syslog (LOG_ERR, "thread creation failed"); + syslog (LOG_ERR, "thread creation failed: %m"); } } + + + EndasUsual: + return returnValue; + EndandClosewithError: + close (sockfd); + EndwithError: + /*When there is an error.*/ + returnValue = -1; + goto EndasUsual; } int daemonize(const char *pname, int facility, int option) { int fd; + + if ( (fd = open( "/dev/tty", O_RDWR, 0) ) == -1) { + /*We already have no tty control*/ + close(fd); + return 0; + } + + /*Sending messages to log*/ + openlog(pname, option, facility); + /*To get a controlling tty*/ + if (ioctl(fd, TIOCNOTTY, (caddr_t)0) <0 ) { + syslog (LOG_ERR, "Getting tty failed: %m"); + return -1; + } + if (close(fd) < 0) { + syslog (LOG_ERR, "Closing tty failed: %m"); + return -1; + } + + if((fd = open( "/dev/null", O_RDWR, 0) ) == -1) { + close(fd); + return -1; + } + dup2(fd,0); + dup2(fd,1); + dup2(fd,2); + close(fd); + + return 0; +} - if ( (fd = open( "/dev/tty", O_RDWR, 0) ) == -1) { - /*We already have no tty control*/ - close(fd); - return 0; - } +void *serverThread (void * arg) { + int sock; - /*Sending messages to log*/ - openlog(pname, option, facility); + /*Variables used for the select function*/ + struct timeval ptime; + fd_set fd_read; + int ret; - ioctl(fd, TIOCNOTTY, (caddr_t)0); - close(fd); + /*Control parameters used while receiving data from the client*/ + int nLeft, nData, iPos; + /*This buffer is intended to store the data received from the client.*/ + char buffer[1025]; - if((fd = open( "/dev/null", O_RDWR, 0) ) == -1) { - close(fd); - return 0; - } - dup2(fd,0); - dup2(fd,1); - dup2(fd,2); - close(fd); + /*The command sent by the client, to be executed by this process*/ + char *command; + /*Store the command length*/ + uint32_t *commandLength; + /*Store the ack*/ + uint32_t *ack; - return 0; -} -void *serverThread (void * arg) { - int sockclient; - int n; - char buf[512]; - sockclient = (int) arg; + sock = (int) arg; + pthread_detach(pthread_self()); - n = recv(sockclient, buf, sizeof(buf), 0); - if(n > 0){ - buf[n] = '\0'; - fork_system(sockclient, buf); - } - close(sockclient); - /*FIXME: dunno why, but after finishing connection from client we have TIME_WAIT in this socket*/ + + /******************************************************************************************/ + /* Just over 1 TCP connection */ + /* COMMAND_LENGTH: Java integer 4 bytes, BIG-ENDIAN (the same as network order) */ + /* ACK: integer 4 bytes big-endian (for Java) with the received comand length */ + /* COMMAND: TPV locale character set encoding */ + /* RESULTS: TPV locale character set encoding */ + /* */ + /* JAVA CLIENT: ------------ COMMAND_LENGTH -------> :SERVER */ + /* JAVA CLIENT: <---------------- ACK -------------- :SERVER */ + /* JAVA CLIENT: -------------- COMMAND ------------> :SERVER */ + /* JAVA CLIENT: <-------------- RESULTS ------------ :SERVER */ + /* JAVA CLIENT: <---------- CLOSE CONNECTION ------- :SERVER */ + /* */ + /******************************************************************************************/ + + + /*Wait 2 seconds for data coming from client.*/ + ptime.tv_sec=2; + ptime.tv_usec=0; + + + /*COMMAND LENGTH*/ + /*First of all we receive the command size as a Java integer (4 bytes primitive type)*/ + if ((commandLength = (uint32_t *) malloc(sizeof(uint32_t))) == NULL) { + syslog (LOG_ERR, "commandLength malloc failed: %m"); + goto Error; + } + bzero(buffer, sizeof(buffer)); + nLeft = sizeof(uint32_t); + nData = iPos = 0; + do { + FD_ZERO(&fd_read); + FD_SET(sock, &fd_read); + ret = select(sock+1, &fd_read,NULL,NULL, &ptime); + if(ret == 0) { + syslog(LOG_INFO, "receiving command length timeout error"); + goto Error; + } + if(ret == -1) { + syslog(LOG_ERR, "receiving command length select error: %m"); + goto Error; + } + if((nData = recv(sock, &buffer[iPos], nLeft, 0)) <= 0) { + syslog (LOG_ERR, "command length reception failed: %m"); + goto Error; + } + nLeft -= nData; + iPos += nData; + } while (nLeft > 0); + if (nLeft < 0) { + syslog (LOG_INFO, "command length excessive data received, expected: 4, received: %d", (-nLeft + sizeof(uint32_t)) ); + } + + /*Retrieve integer (4 bytes) from buffer*/ + memcpy (commandLength, buffer, sizeof(uint32_t)); + /*Java sends the primitive integer using big-endian order (it is the same as network order)*/ + *commandLength = be32toh (*commandLength); + + + /*ACK*/ + /*Sending back the command length as the ACK message*/ + if ((ack = (uint32_t *) malloc(sizeof(uint32_t))) == NULL) { + syslog (LOG_ERR, "commandLength malloc failed: %m"); + goto ErrorAck; + } + *ack = htobe32(*commandLength); + if (send(sock, ack, sizeof(uint32_t), 0) < 0) { + syslog (LOG_ERR, "send ACK failed: %m"); + goto ErrorAck; + } + + + + /*COMMAND*/ + /*Reserving commandLength + 1 because of the string end character*/ + if ((command = (char *) malloc(*commandLength + 1)) == NULL) { + syslog (LOG_ERR, "command malloc failed: %m"); + goto ErrorCommand; + } + bzero(command, ((*commandLength) + 1)); + nLeft = *commandLength; + nData = iPos = 0; + do { + FD_ZERO(&fd_read); + FD_SET(sock, &fd_read); + ret = select(sock+1, &fd_read,NULL,NULL, &ptime); + if(ret == 0) { + syslog(LOG_INFO, "receiving command timeout error"); + goto ErrorCommand; + } + if(ret == -1) { + syslog(LOG_ERR, "receiving command select error: %m"); + goto ErrorCommand; + } + if((nData = recv(sock, &command[iPos], nLeft, 0)) <= 0) { + syslog (LOG_ERR, "command reception failed: %m"); + goto ErrorCommand; + } + nLeft -= nData; + iPos += nData; + } while (nLeft > 0); + if (nLeft < 0) { + syslog (LOG_INFO, "execessive command length, expected: %d, received: %d", *commandLength, (-nLeft + *commandLength)); + } + + + /*RESULTS*/ + fork_system(sock, command); + + + /*CLOSE CONNECTION AND FINISH*/ + + +ErrorCommand: + free(command); +ErrorAck: + free(ack); +Error: + close(sock); + free(commandLength); + pthread_exit(0); } int fork_system(int socket, char *command) { int pid; - int pfdOUT[2]; - int pfdERR[2]; + int out[2], err[2]; char buf[2000]; - char string[100]; + char string[100]; struct pollfd polls[2]; int returnstatus; int n; + int childreturnstatus; + + int returnValue = -1; /*NOK by default*/ - sem_t * semaphore; /*Required variables in order to share memory between processes*/ key_t keyvalue; - int idreturnst; - int *returnst; - - if (pipe(pfdOUT) == -1) { - syslog (LOG_ERR, "pipe for stdout failed"); - close(pfdOUT[0]); - close(pfdOUT[1]); - return -1; - } - if (pipe(pfdERR) == -1) { - syslog (LOG_ERR, "pipe for stderr failed"); - close(pfdOUT[0]); - close(pfdOUT[1]); - close(pfdERR[0]); - close(pfdERR[1]); - return -1; - } - - if ((semaphore = sem_open("javaforksem", O_CREAT, 0644, 1)) == SEM_FAILED) { - syslog (LOG_ERR, "semaphore open failed"); - close(pfdOUT[0]); - close(pfdOUT[1]); - close(pfdERR[0]); - close(pfdERR[1]); - if (sem_close(semaphore) <0 ) { - syslog (LOG_ERR, "semaphore open failed and close failed"); - } - if (sem_unlink("javaforksem") < 0) { - syslog (LOG_ERR, "semaphore open failed and unlink failed"); - } - return -1; + int idreturnst = -1; + /*Store the return status from the process launched using system or execve*/ + /*Using shared memory between the child and parent process*/ + int *returnst = NULL; + /*Required variables in order to share the semaphore between processes*/ + key_t keysemaphore; + int idsemaphore = -1; + /*Used like a barrier: the child process just can start after sending the XML init code*/ + sem_t *semaphore = NULL; + + + out[0] = out[1] = err[0] = err[1] = -1; + + + /*Creating the pipes, they will be attached to the stderr and stdout streams*/ + if (pipe(out) < 0 || pipe(err) < 0) { + syslog (LOG_ERR, "pipe failed: %m"); + goto EndandClosePipes; + } + + + + /*Allocate shared memory because we can not use named semaphores*/ + keysemaphore=ftok("/bin/ls", SHAREMEMSEM); /*the /bin/ls must exist otherwise this does not work... */ + if (keysemaphore == -1) { + syslog (LOG_ERR, "ftok failed: %m"); + goto EndandClosePipes; + } + /*Attach shared memory*/ + if ((idsemaphore = shmget(keysemaphore,sizeof(sem_t), 0660 | IPC_CREAT)) < 0) { + syslog (LOG_ERR, "semaphore initialization failed: %m"); + goto EndandReleaseSem; + } + if ((semaphore = (sem_t *)shmat(idsemaphore, (void *)0, 0)) < 0) { + goto EndandReleaseSem; } if (sem_init(semaphore, 1, 1) < 0) { - syslog (LOG_ERR, "semaphore initialization failed"); - close(pfdOUT[0]); - close(pfdOUT[1]); - close(pfdERR[0]); - close(pfdERR[1]); - if (sem_close(semaphore) <0 ) { - syslog (LOG_ERR, "semaphore open failed and close failed"); - } - if (sem_unlink("javaforksem") < 0) { - syslog (LOG_ERR, "semaphore open failed and unlink failed"); - } - return -1; - } + syslog (LOG_ERR, "semaphore initialization failed: %m"); + goto EndandDestroySem; + } if (sem_wait(semaphore) < 0) { - syslog (LOG_ERR, "semaphore wait failed"); + syslog (LOG_ERR, "semaphore wait failed: %m"); + goto EndandDestroySem; } - /*Allocate shared memory*/ - keyvalue=ftok("/bin/ls", SHAREMEMKEY); /*the /bin/ls must exist otherwise this does not work... */ + + + /*Allocate shared memory for the return status code */ + keyvalue=ftok("/bin/ls", SHAREMEMKEY); /*the /bin/ls must exist otherwise this does not work... */ if (keyvalue == -1) { - syslog (LOG_ERR, "ftok failed"); - /*TODO: Close again pipes and semaphore.*/ - return -1; + syslog (LOG_ERR, "ftok failed: %m"); + goto EndandDestroySem; } /*Attach shared memory*/ - idreturnst=shmget(keyvalue,sizeof(int), 0660 | IPC_CREAT); - if (idreturnst == -1) { - syslog (LOG_ERR, "shmget failed"); - /*TODO: Close again pipes and semaphore.*/ - return -1; + if ((idreturnst=shmget(keyvalue,sizeof(int), 0660 | IPC_CREAT)) < 0) { + syslog (LOG_ERR, "shmget failed: %m"); + goto EndandReleaseMem; } returnst = (int *)shmat(idreturnst, (void *)0, 0); if ((*returnst)== -1) { - syslog (LOG_ERR, "shmat failed"); - /*TODO: Close again pipes and semaphore.*/ - return -1; + syslog (LOG_ERR, "shmat failed: %m"); + goto EndandReleaseMem; } - /*By default*/ (*returnst) = 0; + + if ((pid=fork()) == -1) { - perror("fork: "); - close(pfdOUT[0]); - close(pfdOUT[1]); - close(pfdERR[0]); - close(pfdERR[1]); - if (sem_close(semaphore) <0 ) { - syslog (LOG_ERR, "semaphore open failed and close failed"); - } - if (sem_unlink("javaforksem") < 0) { - syslog (LOG_ERR, "semaphore open failed and unlink failed"); - } - return -1; - } - else { + syslog (LOG_ERR, "fork failed: %m"); + goto EndandReleaseMem; + } + + if (pid == 0) { /*Child process*/ - if (pid == 0) { - if (dup2(pfdOUT[1],1) < 0) { - syslog (LOG_ERR, "child: dup2 pfdOUT failed"); - exit(-1); - } - if (dup2(pfdERR[1],2) < 0) { - syslog (LOG_ERR, "child: dup2 pfdERR failed"); - exit(-1); - } - if (sem_wait(semaphore) < 0) { - syslog (LOG_ERR, "child: semaphore wait failed"); - } - returnstatus=system(command); - if (WIFEXITED(returnstatus) == 1) { - (*returnst) = WEXITSTATUS(returnstatus); - } - else { - (*returnst) = -1; - } - exit(0); + if ((dup2(out[1],1) < 0) || (dup2(err[1],2) < 0)) { + syslog (LOG_ERR, "child dup2 failed: %m"); + exit(-1); } - else { - /*Parent process*/ - polls[0].fd=pfdOUT[0]; - polls[1].fd=pfdERR[0]; - polls[0].events=POLLIN; - polls[1].events=POLLIN; - sprintf(string,""); - send(socket,string,strlen(string),0); - sprintf(string,""); - send(socket,string,strlen(string),0); - /*The child will be woken up*/ - if (sem_post(semaphore) < 0 ) { - syslog (LOG_ERR, "error waiking up child process"); - /*TODO: exit closing the descriptors*/ - } - while(1){ - if(poll(polls,2,100)){ - if(polls[0].revents&&POLLIN) { - bzero(buf,2000); - n=read(pfdOUT[0],buf,1990); - sprintf(string,""); - send(socket,string,strlen(string),0); - } - if(polls[1].revents&&POLLIN) { - bzero(buf,2000); - n=read(pfdERR[0],buf,1990); - sprintf(string,""); - send(socket,string,strlen(string),0); - } - if(!polls[0].revents&&POLLIN && !polls[1].revents&&POLLIN) { - syslog (LOG_ERR, "Error polling pipes"); - break; - } + if (sem_wait(semaphore) < 0) { + syslog (LOG_ERR, "child semaphore wait failed: %m"); + exit(-1); + } + + returnstatus=system(command); + + if (WIFEXITED(returnstatus) == 1) + (*returnst) = WEXITSTATUS(returnstatus); + else + (*returnst) = -1; + exit(0); + } + else { + /*Parent process*/ + polls[0].fd=out[0]; + polls[1].fd=err[0]; + polls[0].events=POLLIN; + polls[1].events=POLLIN; + sprintf(string,""); + send(socket,string,strlen(string),0); + sprintf(string,""); + send(socket,string,strlen(string),0); + + /*Releasing barrier, the child process can keep running*/ + if (sem_post(semaphore) < 0 ) { + syslog (LOG_ERR, "parent error releasing barrier: %m"); + /*Beaware, sig_handler with the child process :]*/ + kill(pid, SIGTERM); + goto EndandReleaseMem; + } + while(1) { + if(poll(polls,2,100)) { + if(polls[0].revents&&POLLIN) { + bzero(buf,2000); + n=read(out[0],buf,1990); + sprintf(string,""); + send(socket,string,strlen(string),0); + } + if(polls[1].revents&&POLLIN) { + bzero(buf,2000); + n=read(err[0],buf,1990); + sprintf(string,""); + send(socket,string,strlen(string),0); } - else { - /*When timeout*/ - if(waitpid(pid,NULL,WNOHANG)) { - /*Child is dead, we can finish the connection*/ - break; - } - /*The child process is not dead, keep polling more data from stdout or stderr streams*/ + if(!polls[0].revents&&POLLIN && !polls[1].revents&&POLLIN) { + syslog (LOG_ERR, "parent error polling pipes: %m"); + /*Beaware, sig_handler with the child process :]*/ + kill(pid, SIGTERM); + /*I want to send an error status to the remote calling process*/ + (*returnst) = -1; + break; } } - - /*We reach this code when the child process is dead or because of an error polling pipes*/ - /*In the second case the result stored in *returnst could be wrong, anyway there was*/ - /*an error so, the result is unpredictable.*/ - /*TODO: if error while polling pipes do not reach this code an exit with -1*/ - sprintf(string,"", (*returnst)); - send(socket,string,strlen(string),0); - sprintf(string,""); - send(socket,string,strlen(string),0); - - close(pfdOUT[0]); - close(pfdOUT[1]); - close(pfdERR[0]); - close(pfdERR[1]); - if (sem_close(semaphore) <0 ) { - syslog (LOG_ERR, "semaphore close failed"); - } - if (sem_unlink("javaforksem") < 0) { - syslog (LOG_ERR, "semaphore unlink failed"); - } - - shmdt ((int *)returnst); - shmctl (idreturnst, IPC_RMID, (struct shmid_ds *)NULL); + else { + /*When timeout*/ + if(waitpid(pid, &childreturnstatus, WNOHANG)) { + /*Child is dead, we can finish the connection*/ + /*First of all, we check the exit status of our child process*/ + /*In case of error send an error status to the remote calling process*/ + if (WIFEXITED(childreturnstatus) != 1) + (*returnst) = -1; + break; + } + /*The child process is not dead, keep polling more data from stdout or stderr streams*/ + } } } + /*Reaching this code when child finished or if error while polling pipes*/ + sprintf(string,"", (*returnst)); + send(socket,string,strlen(string),0); + sprintf(string,""); + send(socket,string,strlen(string),0); - return 0; + /*Stuff just done by the Parent process. The child process ends with exit*/ + + returnValue = 0; /*if everything went OK*/ + +EndandReleaseMem: + if (returnst != NULL) { + /*detach memory*/ + if (shmdt ((int *)returnst) < 0) + syslog (LOG_ERR, "returnstatus shared variable shmdt failed: %m"); + } + /*Mark the segment to be destroyed.*/ + if (shmctl (idreturnst, IPC_RMID, (struct shmid_ds *)NULL) < 0 ) + syslog (LOG_ERR, "returnstatus shared variable shmctl failed: %m"); +EndandDestroySem: + if (sem_destroy(semaphore) <0) + syslog (LOG_ERR, "semaphore destroy failed: %m"); +EndandReleaseSem: + /*after sem_destroy-> input/output parameter NULL?*/ + if (semaphore != NULL) { + /*detach memory*/ + if (shmdt ((sem_t *)semaphore) < 0) + syslog (LOG_ERR, "semaphore shmdt failed: %m"); + } + /*Mark the segment to be destroyed.*/ + if (shmctl (idsemaphore, IPC_RMID, (struct shmid_ds *)NULL) < 0 ) + syslog (LOG_ERR, "semaphore shmctl failed: %m"); +EndandClosePipes: + closeSafely (out[0]); + closeSafely (out[1]); + closeSafely (err[0]); + closeSafely (err[1]); + + return returnValue; } void sigterm_handler(int sig) { - if (currentPID != getpid()) { - //Do nothing - return; - } - if (signal (SIGTERM, SIG_IGN) == SIG_ERR) - { - syslog (LOG_ERR, "signal desactivation failed"); - } - - close (sockfd); - /*TODO: kill child processes and release allocate memory*/ - exit (0); + if (daemonPID != getpid()) { + //Do nothing + return; + } + if (signal (SIGTERM, SIG_IGN) == SIG_ERR) + { + syslog (LOG_ERR, "signal desactivation failed"); + } + + + close (sockfd); + /*TODO: kill child processes and release allocate memory*/ + exit (0); } diff --git a/JavaFork/Daemon/javafork.h b/JavaFork/Daemon/javafork.h index 7f03332..aacdc18 100644 --- a/JavaFork/Daemon/javafork.h +++ b/JavaFork/Daemon/javafork.h @@ -1,8 +1,61 @@ -/*System V IPC key*/ +/*System V IPC keys*/ #define SHAREMEMKEY 1 +#define SHAREMEMSEM 2 +/*Non-argument default values*/ +#define PORT 5193 +#define IPADDRESS "127.0.0.1" +#define QUEUE 6 + + + +/**************************************************************************************** +* This method is used by pthread_create * +* * +* INPUT PARAMETER: socket file descriptor * +* RETURNS: void * +****************************************************************************************/ void *serverThread (void *arg); + + + +/**************************************************************************************** +* This method is used by pthread_create * +* * +* INPUT PARAMETER: socket file descriptor * +* INPUT PARAMETER: +* INPUT PARAMETER: +* RETURNS: void * +****************************************************************************************/ int daemonize(const char *pname, int facility, int option); -int main_child (int argc, char *argv[]); + + + +/**************************************************************************************** +* This method is used by pthread_create * +* * +* INPUT PARAMETER: socket file descriptor * +* RETURNS: int * +****************************************************************************************/ +int main_child (char *address, int port, int queue); + + + +/**************************************************************************************** +* This method is used by pthread_create * +* * +* INPUT PARAMETER: socket file descriptor * +* RETURNS: void * +****************************************************************************************/ int fork_system(int socket, char *command); + + + + +/**************************************************************************************** +* This method is used by pthread_create * +* * +* INPUT PARAMETER: socket file descriptor * +* RETURNS: void * +****************************************************************************************/ void sigterm_handler(); diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/LauncherProcessesDiaFork.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/LauncherProcessesDiaFork.java new file mode 100644 index 0000000..2b32c3e --- /dev/null +++ b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/LauncherProcessesDiaFork.java @@ -0,0 +1,283 @@ +package de.fork.java; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.net.UnknownHostException; +import javax.xml.parsers.ParserConfigurationException; +import org.apache.log4j.Logger; +import org.xml.sax.SAXException; + + +public class LauncherProcessesDiaFork { + // Exit process status + private static final int STATUS_ERR = -1; + private static final int DEFAULT_PORT = 5193; + private static final String DEFAULT_HOST = "127.0.0.1"; + + /** + * Run a process. + * + * @param command system command to be executed. + * + * @return return code. + */ + public static int exec(final String command) throws IOException, InterruptedException { + + return exec(command, null, null); + } + + /** + * Run a process. + * + * @param command system command to execute. + * @param standarOutPut if not null, the standard output is redirected to this parameter. + * + * @return return code. + */ + public static int exec(final String command, final PrintStream standarOutPut) throws IOException, InterruptedException { + + return exec(command, standarOutPut, null); + } + + + /** + * Run a process. + * + * @param command system command to be executed. + * @param standarOutPut if not null, the standard output is redirected to this parameter. + * @param errorOutPut if not null, the error output is redirected to this parameter. + * + * @return return code from the executed system command. + */ + public static int exec(final String command, final PrintStream standarOutPut, final PrintStream errorOutPut) throws IOException, InterruptedException { + + return exec(command, standarOutPut, errorOutPut, DEFAULT_HOST, DEFAULT_PORT); + } + + /** + * Run a process. + * + * @param command system command to be executed. + * @param aLogger send the information to log. + */ + public static int exec(final String command, final Logger aLogger) throws IOException, InterruptedException { + + //calling private method to handle logger input/ouput in a common method + return execHandlingLogger(command, aLogger, DEFAULT_HOST, DEFAULT_PORT); + } + + + /** + * Run process. + * + * @param commandAndArguments String array containing system command and its + * arguments to be executed.
+ * For example: + *
+	 * commandAndArguments[0]="ls";
+	 * commandAndArguments[1]="-lr";
+	 * 
+ * @param aLogger + * + * @return return code from the executed system command. + * + * @throws IOException + * @throws InterruptedException + */ + public static int exec(final String[] commandAndArguments, final Logger aLogger) throws IOException, InterruptedException { + String wholeCommand=""; + + for(String argument : commandAndArguments) { + wholeCommand = wholeCommand + " " + argument; + } + + //calling private method to handle logger input/ouput in a common method + return execHandlingLogger(wholeCommand, aLogger, DEFAULT_HOST, DEFAULT_PORT); + } + + + /** + * Run process using a remote process runner. + * + * @param command system command to be executed. + * @param standarOutPut the stdout stream from that command as a PrintStream + * @param errorOutPut the stderr stream from that command as a PrintStream + * @param host the specified host. + * @param port the where the remote process runner accepts connections. + * + *

The host name can either be a machine name, such as + * "java.sun.com", or a textual representation of its + * IP address. If a literal IP address is supplied, only the + * validity of the address format is checked. + *

+ *

For host specified in literal IPv6 address, + * either the form defined in RFC 2732 or the literal IPv6 address + * format defined in RFC 2373 is accepted. IPv6 scoped addresses are also + * supported. See here for a description of IPv6 + * scoped addresses. + *

+ * + * @return the executed command's return code. + * + * @throws UnknownHostException + * @throws IOException + */ + public static int exec(final String command, final PrintStream standarOutPut, + final PrintStream errorOutPut, final String host, final int port) + throws IOException, InterruptedException { + int exitStatus = LauncherProcessesDiaFork.STATUS_ERR; + XmlForkParser forkParser = null; + TCPForkDaemon process = null; + + try { + forkParser = new XmlForkParser(); + process = new TCPForkDaemon(forkParser, host, port); + exitStatus = process.exec(command); + } catch (ParserConfigurationException e) { + // This is not a crazy thing, we are trying to insert this new method without + // breaking the old methods which did not throw SAXException or ParserConfigurationException + // Do not blame me. + throw new IOException(e); + } catch (SAXException e) { + // This is not a crazy thing, we are trying to insert this new method without + // breaking the old methods which did not throw SAXException or ParserConfigurationException + // Do not blame me. + throw new IOException(e); + } + + + + if ((standarOutPut != null) && (process.getStdout() != null)){ + standarOutPut.println(process.getStdout()); + } + + if ((errorOutPut != null) && (process.getStderr() != null)){ + errorOutPut.println(process.getStderr()); + } + + return exitStatus; + } + + + /** + * Run process. + * + * @param command system command to be executed. + * @param aLogger + * @param host the specified host. + * @param port the TCP port where the daemon accepts connections. + * + * @return the executed command's return code. + * + * @throws IOException + * @throws InterruptedException + */ + private static int execHandlingLogger(final String command, final Logger aLogger, + final String host, int port) throws IOException, InterruptedException { + int exitStatus = LauncherProcessesDiaFork.STATUS_ERR; + XmlForkParser forkParser = null; + TCPForkDaemon process = null; + + try { + forkParser = new XmlForkParser(); + process = new TCPForkDaemon(forkParser, host, port); + exitStatus = process.exec(command); + } catch (ParserConfigurationException e) { + throw new IOException(e); + } catch (SAXException e) { + throw new IOException(e); + } + + + + if (process.getStdout() != null) { + aLogger.info(process.getStdout()); + } + if (process.getStderr() != null) { + aLogger.error(process.getStderr()); + } + + return exitStatus; + } + + + /** + * Run process + * + * @param command command and its arguments to be executed.
+ * For example: + *
+	 * commandAndArguments[0]="ls";
+	 * commandAndArguments[1]="-lr";
+	 * 
+ * @param aLogger send information to log + * + * @return the executed command's return code. + * + * @throws IOException + * @throws InterruptedException + */ + public static InputStream execStream (final String [] command, final Logger aLogger) + throws IOException, InterruptedException { + int exitStatus = LauncherProcessesDiaFork.STATUS_ERR; + InputStream stdInput = null; + XmlForkParser forkParser = null; + TCPForkDaemon process = null; + String wholeCommand=""; + + for(String argument : command) { + wholeCommand = wholeCommand + " " + argument; + } + + try { + forkParser = new XmlForkParser(); + process = new TCPForkDaemon(forkParser, DEFAULT_HOST, DEFAULT_PORT); + exitStatus = process.exec(wholeCommand); + } catch (ParserConfigurationException e) { + // This is not a crazy thing, we are trying to insert this new method without + // breaking the old methods which did not throw SAXException or ParserConfigurationException + // Do not blame me. + throw new IOException(e); + } catch (SAXException e) { + // This is not a crazy thing, we are trying to insert this new method without + // breaking the old methods which did not throw SAXException or ParserConfigurationException + // Do not blame me. + throw new IOException(e); + } + + + if(exitStatus == 0) { + stdInput = new ByteArrayInputStream(process.getStdout().getBytes("UTF-8")); + } + else { + aLogger.error(process.getStderr()); + } + + + return stdInput; + } + + /** + *

The command is lunched from location + *

  • #>cd location
  • + *
  • #location> command
  • + * + * @param command the command to be executed by the daemon. + * @param location + * + * @return the executed command's return code.
    + * Usually 0 if execution is OK, otherwise !=0 + * + * @throws IOException + * @throws InterruptedException + */ + public static int execInLocation (final String command, final String location) throws IOException, InterruptedException { + int exitStatus = LauncherProcessesDiaFork.STATUS_ERR; + final String wholeCommand = "cd " + location + " && " + command; + + exitStatus = exec(wholeCommand, null, null, DEFAULT_HOST, DEFAULT_PORT); + return exitStatus; + } +} diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java new file mode 100644 index 0000000..9b4180b --- /dev/null +++ b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/RemoteForkMain.java @@ -0,0 +1,36 @@ +package de.fork.java; + +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintStream; +import javax.xml.parsers.ParserConfigurationException; +import org.xml.sax.SAXException; + + +public class RemoteForkMain { + + /** + * @param args + * @throws InterruptedException + * @throws IOException + * @throws SAXException + * @throws ParserConfigurationException + * @throws FileNotFoundException + */ + public static void main(String[] args) throws IOException, InterruptedException { + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final PrintStream stdout = new PrintStream(baos); + final String command = "ls -lah"; + final ByteArrayOutputStream baos2 = new ByteArrayOutputStream(); + final PrintStream stderr = new PrintStream(baos2); + int result; + + result = LauncherProcessesDiaFork.exec(command,stdout, stderr, "127.0.0.1", 5193); + System.out.println(result); + System.out.println("Stdout: " + baos.toString()); + System.out.println("Stderr: " + baos2.toString()); + } + +} \ No newline at end of file diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java new file mode 100644 index 0000000..0ecb257 --- /dev/null +++ b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/TCPForkDaemon.java @@ -0,0 +1,178 @@ +package de.fork.java; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import org.xml.sax.SAXException; + +/** + *

    + * With this class we can run processes using the intended daemon which is + * waiting for TCP connections in a specified port. + *

    + *

    + * Receiving the results from the daemon where we can find three kinds of + * different fields: stderror, stdout and the return value of the command which was + * run by the remote daemon. Each field is related to the stderr, stdout and + * return code respectively. + *

    + *

    + * This class has to retrieve the results from the remote daemon and it offers two + * methods wich can be used to retrieve the stderr and stdout in a right way + * without having to know about the coding used by the daemon to send us the results. + * The user does not have to know about how the daemon sends the data, he or she + * will work directly with the strings related to each stream using these methods: + * {@link TCPForkDaemon#getStdout()} and {@link TCPForkDaemon#getStderr()}. + * The return code from the command executed by the daemon can be retrieved as the + * return parameter from the method {@link TCPForkDaemon#exec(String, String, int)} + *

    + *

    + * Instances of this class are mutable. To use them concurrently, clients must surround each + * method invocation (or invocation sequence) with external synchronization of the clients choosing. + *

    + */ +public class TCPForkDaemon { + private final XmlForkParser parser; + private final String host; + private final int port; + + + /** + * Default constructor for this {@link TCPForkDaemon} implementation. + * + *

    The host name can either be a machine name, such as + * "java.sun.com", or a textual representation of its + * IP address. If a literal IP address is supplied, only the + * validity of the address format is checked. + *

    + *

    For host specified in literal IPv6 address, + * either the form defined in RFC 2732 or the literal IPv6 address + * format defined in RFC 2373 is accepted. IPv6 scoped addresses are also + * supported. See here for a description of IPv6 + * scoped addresses. + *

    + * @param parser instance implemeting {@link XmlForkParser} which knows about what + * codification uses the daemon to send us the results of the command sent to + * by the remote daemon by the {@link TCPForkDaemon.#exec(String)} method. + * @param host the specified host. + * @param port the TCP port where the daemon accepts connections. + * + */ + public TCPForkDaemon (final XmlForkParser parser, final String host, final int port) { + this.parser = parser; + this.host = host; + this.port = port; + } + + + /** + *

    + * This method sends commands to a remote daemon using a TCP socket. + * We create a new TCP socket every time we send commands. + *

    + *

    + * It uses a TCP connection in order to send commands and receive + * the results related to that command from the remote daemon. The command's + * result code which was run by the remote daemon can be retrieved from the + * return parameter of this method. + *

    + * @param command the command to be executed by the daemon. + * @return the executed command's return code. + * @throws IOException + * @throws UnknownHostException + * @throws SAXException + * @throws SecurityException if a security manager exists + */ + public int exec(final String command) throws UnknownHostException, IOException, SAXException { + PrintWriter out = null; + Socket socket = null; + + /******************************************************************************************/ + /* Just over 1 TCP connection */ + /* COMMAND_LENGTH: Java integer 4 bytes, BIG-ENDIAN (the same as network order) */ + /* ACK: integer 4 bytes big-endian (for Java) with the sent comand length */ + /* COMMAND: TPV locale character set encoding */ + /* RESULTS: TPV locale character set encoding */ + /* */ + /* JAVA CLIENT: ------------ COMMAND_LENGTH -------> :SERVER */ + /* JAVA CLIENT: <---------------- ACK -------------- :SERVER */ + /* JAVA CLIENT: -------------- COMMAND ------------> :SERVER */ + /* JAVA CLIENT: <-------------- RESULTS ------------ :SERVER */ + /* JAVA CLIENT: <---------- CLOSE CONNECTION ------- :SERVER */ + /* */ + /******************************************************************************************/ + + + + socket = new Socket(InetAddress.getByName(host), port); + try { + /*Must be used the remote charset :S*/ + byte [] commandEncoded = command.getBytes("UTF-8"); + + DataOutputStream sendData = new DataOutputStream(socket.getOutputStream()); + DataInputStream receiveData = new DataInputStream(socket.getInputStream()); + + // 1. COMMAND_LENGTH + sendData.writeInt(commandEncoded.length); + + // 2. ACK + // TODO: if the server close the connection we could stay here probably + // until TCP keepalive is sent (20 hours by default in Linux) + int ack = receiveData.readInt(); + if (ack != commandEncoded.length) + throw new IOException("invalid ACK, something went wrong " + + "with the TCPForkDaemon. Check the /var/log/messages file in the TPV"); + + + // 3. COMMAND + sendData.write(commandEncoded); + + + // 4. RESULTS + // TODO: if the server closes the connection we could stay here probably + // until TCP keepalive is sent (20 hours by default in Linux) + parser.setStream(socket.getInputStream()); + + + // 5. SERVER CLOSES CONNECTION + } + finally { + if (out != null) { + out.close(); + } + socket.close(); + } + + //If everything went alright we should be able to retrieve the return + //status of the remotely executed command. + return parser.getReturnValue(); + } + + + /** + * Retrieve the standard output.
    + * When there is nothing from the standard output this method returns null. + * + * @see {@link TCPForkDaemon#getStderr()} + * @return the stdout stream + */ + public String getStdout() { + return parser.getStdout(); + } + + + /** + * Retrieve the stderr stream as a {@link String} from the command which + * was run by the remote daemon + * + * @see {@link TCPForkDaemon#getStdout()} + * @return the stderr stream + */ + public String getStderr() { + return parser.getStderr(); + } +} diff --git a/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java new file mode 100644 index 0000000..51fe7f0 --- /dev/null +++ b/JavaFork/JavaExample/javafork-example/src/main/java/de/fork/java/XmlForkParser.java @@ -0,0 +1,195 @@ +package de.fork.java; + +import java.io.IOException; +import java.io.InputStream; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.parsers.SAXParser; +import javax.xml.parsers.SAXParserFactory; +import org.apache.log4j.Logger; +import org.xml.sax.Attributes; +import org.xml.sax.SAXException; +import org.xml.sax.SAXParseException; +import org.xml.sax.ext.DefaultHandler2; + +/** + *

    + * Class intended to parse the XML stream received from the daemon which is + * waiting to run commands. These commands are sent by the method + * {@link es.dia.pos.n2a.util.os.unix.TCPForkDaemon#exec(String, String, int)} + *

    + *

    + * After processing one command the daemon sends a XML stream with the stderr, + * stdout and return status of that command. With this class we extract those values + * and we can retrieve them using the methods {@link #getStderr() }, {@link #getStdout()} + * and {@link #getReturnValue()} + *

    + *

    + *

    + * Example, stream received from daemon:
    + * {@code
    + * 
    + * }
    + * 
    + *

    + *

    + * Instances of this class are mutable. To use them concurrently, clients must surround each + * method invocation (or invocation sequence) with external synchronization of the clients choosing. + *

    + */ +public class XmlForkParser extends DefaultHandler2 { + private static final Logger logger = Logger.getLogger(XmlForkParser.class); + private StringBuffer accumulator = new StringBuffer(); + private String stderr = new String(); + private String stdout = new String(); + private String returnCode = new String(); + final SAXParserFactory spf = SAXParserFactory.newInstance(); + private final SAXParser saxParser; + + public XmlForkParser() throws ParserConfigurationException, SAXException { + saxParser = spf.newSAXParser(); + } + + public void setStream(InputStream stream) throws SAXException, IOException { + saxParser.parse(stream, this); + } + + /** + *

    + * The daemon sends a XML stream, we parse that stream and the results are + * stored in the instace fields {@link #stderr}, {@link #stdout} and {@link returnCode} + *

    + *

    + * Later we can retrieve the results with {@link #getStderr()}, {@link #getStdout()} and + * {@link #getReturnValue()} + *

    + */ + @Override + public void endElement (final String uri, final String localName, final String qName) { + if (qName.equals("error")) { + // After , we've got the stderror + stderr = stderr + accumulator.toString(); + } else if (qName.equals("out")) { + // After , we've got the stdout + stdout = stdout + accumulator.toString(); + } else if (qName.equals("ret")) { + returnCode = returnCode + accumulator.toString(); + } + } + + /** + *

    + * This method removes the \n characters at the end of the stdout + * or stderr stream. + *

    + * + * @throws SAXException If any SAX errors occur during processing. + */ + @Override + public void endDocument () throws SAXException + { + if (stderr.length() != 0) { + String lastStderr = stderr.replaceFirst("\\\n$", ""); + stderr = lastStderr; + } + else { + //if there is nothing from the stderr stream + stderr = null; + } + if (stdout.length() != 0) { + String lastStdout = stdout.replaceFirst("\\\n$", ""); + stdout = lastStdout; + } + else { + //if there is nothing from the stdout stream + stdout = null; + } + } + + /** + * Retrieve the standard error. + * When there is nothing from the standard error this method returns null. + * + *
    +	 * Example, stream received from daemon:
    +	 * {@code
    +	 * 
    +	 * }
    +	 * 
    + *

    + *

    + *

    +	 * From that example with this method we are going to obtain this return parameter:
    +	 * {@code
    +	 * ls: no se puede acceder a bbb: No existe el fichero o el directorio
    +	 * ls: no se puede acceder a aaa: No existe el fichero o el directorio
    +	 * ls: no se puede acceder a dddd: No existe el fichero o el directorio
    +	 * }
    +	 * 
    + * + * @return stderr + */ + public String getStderr() { + return stderr; + + } + + + /** + * Retrieve the standard output. + * When there is nothing from the standard output this method returns null. + * + * @see {@link XmlForkParser#getStderr()} + * @return stdout + */ + public String getStdout() { + return stdout; + } + + + /** + * Retrieve the return code from the executed command. + * + * @return return status, usually 0 means the command went OK. + */ + public int getReturnValue() { + return new Integer(returnCode).intValue(); + } + + + @Override + public void startElement (final String uri, final String localName, + final String qName, final Attributes attributes) { + accumulator.setLength(0); + } + + + @Override + public void characters(final char[] buffer, final int start, final int length) { + accumulator.append(buffer, start, length); + } + + + @Override + public void warning(final SAXParseException exception) { + logger.error("WARNING line:" + exception.getLineNumber(), exception); + } + + + @Override + public void error(final SAXParseException exception) { + logger.error("ERROR line:" + exception.getLineNumber(), exception); + } + + + @Override + public void fatalError(final SAXParseException exception) throws SAXException { + logger.error("FATAL ERROR line:" + exception.getLineNumber(), exception); + throw (exception); + } +}