Plan 9 from Bell Labs’s /usr/web/sources/plan9/sys/src/cmd/upas/q/runq.c

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


#include "common.h"
#include <ctype.h>

void	doalldirs(void);
void	dodir(char*);
void	dofile(Dir*);
void	rundir(char*);
char*	file(char*, char);
void	warning(char*, void*);
void	error(char*, void*);
int	returnmail(char**, char*, char*);
void	logit(char*, char*, char**);
void	doload(int);

#define HUNK 32
char	*cmd;
char	*root;
int	debug;
int	giveup = 2*24*60*60;
int	load;
int	limit;

/* the current directory */
Dir	*dirbuf;
long	ndirbuf = 0;
int	nfiles;
char	*curdir;

char *runqlog = "runq";

int	*pidlist;
char	**badsys;		/* array of recalcitrant systems */
int	nbad;
int	npid = 50;
int	sflag;			/* single thread per directory */
int	aflag;			/* all directories */
int	Eflag;			/* ignore E.xxxxxx dates */
int	Rflag;			/* no giving up, ever */

void
usage(void)
{
	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
	exits("");
}

void
main(int argc, char **argv)
{
	char *qdir, *x;

	qdir = 0;

	ARGBEGIN{
	case 'l':
		x = ARGF();
		if(x == 0)
			usage();
		load = atoi(x);
		if(load < 0)
			load = 0;
		break;
	case 'E':
		Eflag++;
		break;
	case 'R':	/* no giving up -- just leave stuff in the queue */
		Rflag++;
		break;
	case 'a':
		aflag++;
		break;
	case 'd':
		debug++;
		break;
	case 'r':
		limit = atoi(ARGF());
		break;
	case 's':
		sflag++;
		break;
	case 't':
		giveup = 60*60*atoi(ARGF());
		break;
	case 'q':
		qdir = ARGF();
		if(qdir == 0)
			usage();
		break;
	case 'n':
		npid = atoi(ARGF());
		if(npid == 0)
			usage();
		break;
	}ARGEND;

	if(argc != 2)
		usage();

	pidlist = malloc(npid*sizeof(*pidlist));
	if(pidlist == 0)
		error("can't malloc", 0);

	if(aflag == 0 && qdir == 0) {
		qdir = getuser();
		if(qdir == 0)
			error("unknown user", 0);
	}
	root = argv[0];
	cmd = argv[1];

	if(chdir(root) < 0)
		error("can't cd to %s", root);

	doload(1);
	if(aflag)
		doalldirs();
	else
		dodir(qdir);
	doload(0);
	exits(0);
}

int
emptydir(char *name)
{
	int fd;
	long n;
	char buf[2048];

	fd = open(name, OREAD);
	if(fd < 0)
		return 1;
	n = read(fd, buf, sizeof(buf));
	close(fd);
	if(n <= 0) {
		if(debug)
			fprint(2, "removing directory %s\n", name);
		syslog(0, runqlog, "rmdir %s", name);
		sysremove(name);
		return 1;
	}
	return 0;
}

int
forkltd(void)
{
	int i;
	int pid;

	for(i = 0; i < npid; i++){
		if(pidlist[i] <= 0)
			break;
	}

	while(i >= npid){
		pid = waitpid();
		if(pid < 0){
			syslog(0, runqlog, "forkltd confused");
			exits(0);
		}

		for(i = 0; i < npid; i++)
			if(pidlist[i] == pid)
				break;
	}
	pidlist[i] = fork();
	return pidlist[i];
}

/*
 *  run all user directories, must be bootes (or root on unix) to do this
 */
void
doalldirs(void)
{
	Dir *db;
	int fd;
	long i, n;


	fd = open(".", OREAD);
	if(fd == -1){
		warning("reading %s", root);
		return;
	}
	n = sysdirreadall(fd, &db);
	if(n > 0){
		for(i=0; i<n; i++){
			if(db[i].qid.type & QTDIR){
				if(emptydir(db[i].name))
					continue;
				switch(forkltd()){
				case -1:
					syslog(0, runqlog, "out of procs");
					doload(0);
					exits(0);
				case 0:
					if(sysdetach() < 0)
						error("%r", 0);
					dodir(db[i].name);
					exits(0);
				default:
					break;
				}
			}
		}
		free(db);
	}
	close(fd);
}

/*
 *  cd to a user directory and run it
 */
void
dodir(char *name)
{
	curdir = name;

	if(chdir(name) < 0){
		warning("cd to %s", name);
		return;
	}
	if(debug)
		fprint(2, "running %s\n", name);
	rundir(name);
	chdir("..");
}

/*
 *  run the current directory
 */
void
rundir(char *name)
{
	int fd;
	long i;

	if(aflag && sflag)
		fd = sysopenlocked(".", OREAD);
	else
		fd = open(".", OREAD);
	if(fd == -1){
		warning("reading %s", name);
		return;
	}
	nfiles = sysdirreadall(fd, &dirbuf);
	if(nfiles > 0){
		for(i=0; i<nfiles; i++){
			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
				continue;
			dofile(&dirbuf[i]);
		}
		free(dirbuf);
	}
	if(aflag && sflag)
		sysunlockfile(fd);
	else
		close(fd);
}

/*
 *  free files matching name in the current directory
 */
void
remmatch(char *name)
{
	long i;

	syslog(0, runqlog, "removing %s/%s", curdir, name);

	for(i=0; i<nfiles; i++){
		if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
			sysremove(dirbuf[i].name);
	}

	/* error file (may have) appeared after we read the directory */
	/* stomp on data file in case of phase error */
	sysremove(file(name, 'D'));
	sysremove(file(name, 'E'));
}

/*
 *  like trylock, but we've already got the lock on fd,
 *  and don't want an L. lock file.
 */
static Mlock *
keeplockalive(char *path, int fd)
{
	char buf[1];
	Mlock *l;

	l = malloc(sizeof(Mlock));
	if(l == 0)
		return 0;
	l->fd = fd;
	l->name = s_new();
	s_append(l->name, path);

	/* fork process to keep lock alive until sysunlock(l) */
	switch(l->pid = rfork(RFPROC)){
	default:
		break;
	case 0:
		fd = l->fd;
		for(;;){
			sleep(1000*60);
			if(pread(fd, buf, 1, 0) < 0)
				break;
		}
		_exits(0);
	}
	return l;
}

/*
 *  try a message
 */
void
dofile(Dir *dp)
{
	Dir *d;
	int dfd, ac, dtime, efd, pid, i, etime;
	char *buf, *cp, **av;
	Waitmsg *wm;
	Biobuf *b;
	Mlock *l = nil;

	if(debug)
		fprint(2, "dofile %s\n", dp->name);
	/*
	 *  if no data file or empty control or data file, just clean up
	 *  the empty control file must be 15 minutes old, to minimize the
	 *  chance of a race.
	 */
	d = dirstat(file(dp->name, 'D'));
	if(d == nil){
		syslog(0, runqlog, "no data file for %s", dp->name);
		remmatch(dp->name);
		return;
	}
	if(dp->length == 0){
		if(time(0)-dp->mtime > 15*60){
			syslog(0, runqlog, "empty ctl file for %s", dp->name);
			remmatch(dp->name);
		}
		return;
	}
	dtime = d->mtime;
	free(d);

	/*
	 *  retry times depend on the age of the errors file
	 */
	if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
		etime = d->mtime;
		free(d);
		if(etime - dtime < 15*60){
			/* up to the first 15 minutes, every 30 seconds */
			if(time(0) - etime < 30)
				return;
		} else if(etime - dtime < 60*60){
			/* up to the first hour, try every 15 minutes */
			if(time(0) - etime < 15*60)
				return;
		} else {
			/* after the first hour, try once an hour */
			if(time(0) - etime < 60*60)
				return;
		}

	}

	/*
	 *  open control and data
	 */
	b = sysopen(file(dp->name, 'C'), "rl", 0660);
	if(b == 0) {
		if(debug)
			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
		return;
	}
	dfd = open(file(dp->name, 'D'), OREAD);
	if(dfd < 0){
		if(debug)
			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
		Bterm(b);
		sysunlockfile(Bfildes(b));
		return;
	}

	/*
	 *  make arg list
	 *	- read args into (malloc'd) buffer
	 *	- malloc a vector and copy pointers to args into it
	 */
	buf = malloc(dp->length+1);
	if(buf == 0){
		warning("buffer allocation", 0);
		Bterm(b);
		sysunlockfile(Bfildes(b));
		close(dfd);
		return;
	}
	if(Bread(b, buf, dp->length) != dp->length){
		warning("reading control file %s\n", dp->name);
		Bterm(b);
		sysunlockfile(Bfildes(b));
		close(dfd);
		free(buf);
		return;
	}
	buf[dp->length] = 0;
	av = malloc(2*sizeof(char*));
	if(av == 0){
		warning("argv allocation", 0);
		close(dfd);
		free(buf);
		Bterm(b);
		sysunlockfile(Bfildes(b));
		return;
	}
	for(ac = 1, cp = buf; *cp; ac++){
		while(isspace(*cp))
			*cp++ = 0;
		if(*cp == 0)
			break;

		av = realloc(av, (ac+2)*sizeof(char*));
		if(av == 0){
			warning("argv allocation", 0);
			close(dfd);
			free(buf);
			Bterm(b);
			sysunlockfile(Bfildes(b));
			return;
		}
		av[ac] = cp;
		while(*cp && !isspace(*cp)){
			if(*cp++ == '"'){
				while(*cp && *cp != '"')
					cp++;
				if(*cp)
					cp++;
			}
		}
	}
	av[0] = cmd;
	av[ac] = 0;

	if(!Eflag &&time(0) - dtime > giveup){
		if(returnmail(av, dp->name, "Giveup") != 0)
			logit("returnmail failed", dp->name, av);
		remmatch(dp->name);
		goto done;
	}

	for(i = 0; i < nbad; i++){
		if(strcmp(av[3], badsys[i]) == 0)
			goto done;
	}

	/*
	 * Ken's fs, for example, gives us 5 minutes of inactivity before
	 * the lock goes stale, so we have to keep reading it.
 	 */
	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));

	/*
	 *  transfer
	 */
	pid = fork();
	switch(pid){
	case -1:
		sysunlock(l);
		sysunlockfile(Bfildes(b));
		syslog(0, runqlog, "out of procs");
		exits(0);
	case 0:
		if(debug) {
			fprint(2, "Starting %s", cmd);
			for(ac = 0; av[ac]; ac++)
				fprint(2, " %s", av[ac]);
			fprint(2, "\n");
		}
		logit("execing", dp->name, av);
		close(0);
		dup(dfd, 0);
		close(dfd);
		close(2);
		efd = open(file(dp->name, 'E'), OWRITE);
		if(efd < 0){
			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
			efd = create(file(dp->name, 'E'), OWRITE, 0666);
			if(efd < 0){
				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
				exits("could not open error file - Retry");
			}
		}
		seek(efd, 0, 2);
		exec(cmd, av);
		error("can't exec %s", cmd);
		break;
	default:
		for(;;){
			wm = wait();
			if(wm == nil)
				error("wait failed: %r", "");
			if(wm->pid == pid)
				break;
			free(wm);
		}
		if(debug)
			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);

		if(wm->msg[0]){
			if(debug)
				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
			syslog(0, runqlog, "message: %s\n", wm->msg);
			if(strstr(wm->msg, "Ignore") != nil){
				/* fix for fish/chips, leave message alone */
				logit("ignoring", dp->name, av);
			}else if(!Rflag && strstr(wm->msg, "Retry")==0){
				/* return the message and remove it */
				if(returnmail(av, dp->name, wm->msg) != 0)
					logit("returnmail failed", dp->name, av);
				remmatch(dp->name);
			} else {
				/* add sys to bad list and try again later */
				nbad++;
				badsys = realloc(badsys, nbad*sizeof(char*));
				badsys[nbad-1] = strdup(av[3]);
			}
		} else {
			/* it worked remove the message */
			remmatch(dp->name);
		}
		free(wm);

	}
done:
	if (l)
		sysunlock(l);
	Bterm(b);
	sysunlockfile(Bfildes(b));
	free(buf);
	free(av);
	close(dfd);
}


/*
 *  return a name starting with the given character
 */
char*
file(char *name, char type)
{
	static char nname[Elemlen+1];

	strncpy(nname, name, Elemlen);
	nname[Elemlen] = 0;
	nname[0] = type;
	return nname;
}

/*
 *  send back the mail with an error message
 *
 *  return 0 if successful
 */
int
returnmail(char **av, char *name, char *msg)
{
	int pfd[2];
	Waitmsg *wm;
	int fd;
	char buf[256];
	char attachment[256];
	int i;
	long n;
	String *s;
	char *sender;

	if(av[1] == 0 || av[2] == 0){
		logit("runq - dumping bad file", name, av);
		return 0;
	}

	s = unescapespecial(s_copy(av[2]));
	sender = s_to_c(s);

	if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
		logit("runq - dumping p to p mail", name, av);
		return 0;
	}

	if(pipe(pfd) < 0){
		logit("runq - pipe failed", name, av);
		return -1;
	}

	switch(rfork(RFFDG|RFPROC|RFENVG)){
	case -1:
		logit("runq - fork failed", name, av);
		return -1;
	case 0:
		logit("returning", name, av);
		close(pfd[1]);
		close(0);
		dup(pfd[0], 0);
		close(pfd[0]);
		putenv("upasname", "/dev/null");
		snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
		snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
		execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil);
		error("can't exec", 0);
		break;
	default:
		break;
	}

	close(pfd[0]);
	fprint(pfd[1], "\n");	/* get out of headers */
	if(av[1]){
		fprint(pfd[1], "Your request ``%.20s ", av[1]);
		for(n = 3; av[n]; n++)
			fprint(pfd[1], "%s ", av[n]);
	}
	fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
	fd = open(file(name, 'E'), OREAD);
	if(fd >= 0){
		for(;;){
			n = read(fd, buf, sizeof(buf));
			if(n <= 0)
				break;
			if(write(pfd[1], buf, n) != n){
				close(fd);
				goto out;
			}
		}
		close(fd);
	}
	close(pfd[1]);
out:
	wm = wait();
	if(wm == nil){
		syslog(0, "runq", "wait: %r");
		logit("wait failed", name, av);
		return -1;
	}
	i = 0;
	if(wm->msg[0]){
		i = -1;
		syslog(0, "runq", "returnmail child: %s", wm->msg);
		logit("returnmail child failed", name, av);
	}
	free(wm);
	return i;
}

/*
 *  print a warning and continue
 */
void
warning(char *f, void *a)
{
	char err[65];
	char buf[256];

	rerrstr(err, sizeof(err));
	snprint(buf, sizeof(buf), f, a);
	fprint(2, "runq: %s: %s\n", buf, err);
}

/*
 *  print an error and die
 */
void
error(char *f, void *a)
{
	char err[Errlen];
	char buf[256];

	rerrstr(err, sizeof(err));
	snprint(buf, sizeof(buf), f, a);
	fprint(2, "runq: %s: %s\n", buf, err);
	exits(buf);
}

void
logit(char *msg, char *file, char **av)
{
	int n, m;
	char buf[256];

	n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
	for(; *av; av++){
		m = strlen(*av);
		if(n + m + 4 > sizeof(buf))
			break;
		sprint(buf + n, " '%s'", *av);
		n += m + 3;
	}
	syslog(0, runqlog, "%s", buf);
}

char *loadfile = ".runqload";

/*
 *  load balancing
 */
void
doload(int start)
{
	int fd;
	char buf[32];
	int i, n;
	Mlock *l;
	Dir *d;

	if(load <= 0)
		return;

	if(chdir(root) < 0){
		load = 0;
		return;
	}

	l = syslock(loadfile);
	fd = open(loadfile, ORDWR);
	if(fd < 0){
		fd = create(loadfile, 0666, ORDWR);
		if(fd < 0){
			load = 0;
			sysunlock(l);
			return;
		}
	}

	/* get current load */
	i = 0;
	n = read(fd, buf, sizeof(buf)-1);
	if(n >= 0){
		buf[n] = 0;
		i = atoi(buf);
	}
	if(i < 0)
		i = 0;

	/* ignore load if file hasn't been changed in 30 minutes */
	d = dirfstat(fd);
	if(d != nil){
		if(d->mtime + 30*60 < time(0))
			i = 0;
		free(d);
	}

	/* if load already too high, give up */
	if(start && i >= load){
		sysunlock(l);
		exits(0);
	}

	/* increment/decrement load */
	if(start)
		i++;
	else
		i--;
	seek(fd, 0, 0);
	fprint(fd, "%d\n", i);
	sysunlock(l);
	close(fd);
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to webmaster@9p.io.