Plan 9 from Bell Labs’s /usr/web/sources/contrib/nemo/octopus/port/mux/mux.b

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


#
# Multiplexed octopus connections.
# Provides a file tree that switches from one
# implementor to another without breaking the client name space.
#
# BUGS:
# This requires OR, AND, and MIN expressions, so that a single mount
# of mux would provide the mount point desired. Otherwise, we have to
# o/mux .... ; o/mux -a ... ; o/mux -a ....
# and that is a waste.

# Also, a way is needed to know which particular fs is the one being used.
# Probably a ndb file in the root of each fs muxed.

implement Mux;

include "sys.m";
	sys: Sys;
	fildes, fprint, FD, OTRUNC, DMDIR, ORCLOSE, OREAD, ORDWR,
	Qid, print, pctl, create, mount, QTDIR, pread, open, sprint, NEWPGRP,
	FORKNS, NEWFD, MREPL, MBEFORE, MCREATE, MAFTER, pipe, 
	pwrite, remove, nulldir, fwstat, wstat, fstat, stat, Dir, write, sleep, millisec: import sys;
include "draw.m";
include "styx.m";
	styx: Styx;
	Tmsg, Rmsg, NOFID, IOHDRSZ, VERSION, MAXRPC,
	unpackdir, compatible, packdir, NOTAG: import styx;
include "error.m";
	err: Error;
	checkload, stderr, panic, kill, error: import err;
include "arg.m";
	arg: Arg;
	usage: import arg;
include "names.m";
	names: Names;
	dirname, cleanname, relative, isprefix, rooted: import names;

Mux: module {
	init: fn(nil: ref Draw->Context, argv: list of string);
};

include "muxdat.m";
	dat: Muxdat;
	Qroot, NOQID, Fid, rebind, rootdir, brokenfs,
	bindrootdir, addfid, delfid, getfid, maybebroken,
	addqid, getqid, fixqid, delqid, renametree, debug, qgen: import dat;

slash:	ref Fid;
msize:	int;

terminate()
{
	if (debug)
		fprint(stderr, "mux: terminating\n");
	kill(pctl(0,nil), "killgrp");	# kill tmsgreader and any other
	exit;
}

fsversion(m: ref Tmsg.Version): ref Rmsg
{
	if (msize <= 0)
		msize = MAXRPC;
	(sz, version) := compatible(m, msize, VERSION);
	if(sz < 256)
		return ref Rmsg.Error(m.tag, "message size too small");
	msize = sz;
	return ref Rmsg.Version(m.tag, msize, version);
}

fsattach(m: ref Tmsg.Attach): ref Rmsg
{
	slash = ref Fid(m.fid, nil, 0, -1, Qroot, rootdir);
	if (addfid(slash) < 0){
		slash = nil;
		return ref Rmsg.Error(m.tag, "fid already exists");
	}
	slash.qid = Qroot;
	slash.path = rootdir;
	q := addqid(rootdir);
	if (q != Qroot.path)
		panic("bug: first qid != Qroot");
	return ref Rmsg.Attach(m.tag, Qroot);
}

fsopen(m: ref Tmsg.Open): ref Rmsg
{
	fid := getfid(m.fid);
	if (fid == nil)
		return ref Rmsg.Error(m.tag, "bad fid");
	if (fid.broken)
		return ref Rmsg.Error(m.tag, "i/o error");
	if (fid.fd != nil)
		return ref Rmsg.Error(m.tag, "fid already open");
	fid.fd = open(fid.path, m.mode);
	if (fid.fd == nil){
		e := sprint("%r");
		maybebroken(e);
		return ref Rmsg.Error(m.tag, e);
	}
	fid.omode = m.mode;
	return ref Rmsg.Open(m.tag, fid.qid, msize - IOHDRSZ);
}

fixdirqids(fid: ref Fid, b: array of byte)
{
	d: Dir;
	n := 0;
	for(o := 0; o < len b; o += n){
		(n, d) = unpackdir(b[o:]);
		if (n <= 0){
			if (n < 0 && debug)
				fprint(stderr, "fixdirqids: unpack: %r\n");
			break;
		}
		path := rooted(fid.path, d.name);
		d.qid = fixqid(path, d.qid);
		b[o:] = packdir(d);
		o += n;
	}
}

fsread(m: ref Tmsg.Read): ref Rmsg
{
	fid := getfid(m.fid);
	if (fid == nil)
		return ref Rmsg.Error(m.tag, "bad fid");
	if (fid.broken)
		return ref Rmsg.Error(m.tag, "i/o error");
	if (fid.fd == nil)
		return ref Rmsg.Error(m.tag, "fid not open");
	if (m.count > msize)
		return ref Rmsg.Error(m.tag, "count too big");
	b := array[m.count] of byte;
	nr := pread(fid.fd, b, m.count, m.offset);
	if (nr < 0){
		e := sprint("%r");
		maybebroken(e);
		return ref Rmsg.Error(m.tag, e);
	}
	nb := b[0:nr];
	b = nil;
	if (fid.qid.qtype&QTDIR)
		fixdirqids(fid, nb);
	return ref Rmsg.Read(m.tag, nb);
}

fswrite(m: ref Tmsg.Write): ref Rmsg
{
	fid := getfid(m.fid);
	if (fid == nil)
		return ref Rmsg.Error(m.tag, "bad fid");
	if (fid.broken)
		return ref Rmsg.Error(m.tag, "i/o error");
	if (fid.fd == nil)
		return ref Rmsg.Error(m.tag, "fid not open");
	if (len m.data > msize)
		return ref Rmsg.Error(m.tag, "count too big");
	nw := pwrite(fid.fd, m.data, len m.data, m.offset);
	if (nw < 0){
		e := sprint("%r");
		maybebroken(e);
		return ref Rmsg.Error(m.tag, e);
	} else
		return ref Rmsg.Write(m.tag, nw);
}

fsclunk(m: ref Tmsg.Clunk): ref Rmsg
{
	fid := getfid(m.fid);
	if (fid == nil)
		return ref Rmsg.Error(m.tag, "bad fid");
	delfid(fid);
	fid.fd = nil;
	return ref Rmsg.Clunk(m.tag);
}

fscreate(m: ref Tmsg.Create): ref Rmsg
{
	fid := getfid(m.fid);
	if (fid == nil)
		return ref Rmsg.Error(m.tag, "bad fid");
	if (fid.broken)
		return ref Rmsg.Error(m.tag, "i/o error");
	if (fid.fd != nil)
		return ref Rmsg.Error(m.tag, "fid already open");
	npath := rooted(fid.path, m.name);
	if (npath == nil)
		return ref Rmsg.Error(m.tag, "bad name");
	fd := create(fid.path, m.mode, m.perm);
	if (fd == nil){
		e := sprint("%r");
		maybebroken(e);
		return ref Rmsg.Error(m.tag, e);
	}
	fid.fd = fd;
	fid.path = npath;
	fid.qid = Qid(big 0, 0, 0);
	if (m.mode&DMDIR)
		fid.qid.qtype = QTDIR;
	fid.qid = fixqid(fid.path, fid.qid);
	fid.omode = m.mode;
	return ref Rmsg.Create(m.tag, fid.qid, msize - IOHDRSZ);
}

fsremove(m: ref Tmsg.Remove): ref Rmsg
{
	fid := getfid(m.fid);
	if (fid == nil)
		return ref Rmsg.Error(m.tag, "bad fid");
	if (fid.broken)
		return ref Rmsg.Error(m.tag, "i/o error");
	path := fid.path;
	delfid(fid);
	if (remove(path) < 0){
		e := sprint("%r");
		maybebroken(e);
		return ref Rmsg.Error(m.tag, e);
	} else {
		delqid(fid.path);
		return ref Rmsg.Remove(m.tag);
	}
}

fswstat(m: ref Tmsg.Wstat): ref Rmsg
{
	fid := getfid(m.fid);
	if (fid == nil)
		return ref Rmsg.Error(m.tag, "bad fid");
	if (fid.broken)
		return ref Rmsg.Error(m.tag, "i/o error");
	m.stat.qid = nulldir.qid;
	ec: int;
	if (fid.fd != nil)
		ec = fwstat(fid.fd, m.stat);
	else
		ec = wstat(fid.path, m.stat);
	if (ec < 0){
		e := sprint("%r");
		maybebroken(e);
		return ref Rmsg.Error(m.tag, e);
	} else {
		if (m.stat.name != nil)
			renametree(fid.path, m.stat.name);
		return ref Rmsg.Wstat(m.tag);
	}
}

fsstat(m: ref Tmsg.Stat): ref Rmsg
{
	fid := getfid(m.fid);
	if (fid == nil)
		return ref Rmsg.Error(m.tag, "bad fid");
	if (fid.broken)
		return ref Rmsg.Error(m.tag, "i/o error");
	ec: int;
	d: Dir;
	if (fid.fd != nil)
		(ec,d) = fstat(fid.fd);
	else
		(ec,d) = stat(fid.path);
	if (ec < 0){
		if (debug)
			fprint(stderr, "stat: %s: %r\n", fid.path);
		e := sprint("%r");
		maybebroken(e);
		return ref Rmsg.Error(m.tag, sprint("%r"));
	}
	if (fid.path == rootdir)
		d.name = "/";
	d.qid = fid.qid = fixqid(fid.path, d.qid);
	return ref Rmsg.Stat(m.tag, d);
}

fswalk(m: ref Tmsg.Walk): ref Rmsg
{
	fid := getfid(m.fid);
	if (fid == nil)
		return ref Rmsg.Error(m.tag, "bad fid");
	if (fid.broken)
		return ref Rmsg.Error(m.tag, "i/o error");
	if (fid.fd != nil)
		return ref Rmsg.Error(m.tag, "walk on a open fid");

	# Clone if needed
	oqid := fid.qid;
	opath:= fid.path;
	if (m.newfid != m.fid){
		nfid := ref Fid(m.newfid, nil, 0, -1, fid.qid, fid.path); 
		if (addfid(nfid) < 0)
			return ref Rmsg.Error(m.tag, "fid already in use");
		fid = nfid;
	}

	# Walk to m.names
	nqids := len m.names;
	if (nqids == 0)
		return ref Rmsg.Walk(m.tag, array[0] of Qid);
	wpaths := array[nqids] of string;
	wqids := array[nqids] of Qid;
	for (i := 0; i < nqids; i++){
		fid.path = rooted(fid.path, m.names[i]);
		fid.path = cleanname(fid.path);
		if (!isprefix(rootdir, fid.path))
			fid.path = rootdir;
		wpaths[i] = fid.path;
	}
	(ec, d) := stat(fid.path);
	if (ec < 0){
		if (debug)
			fprint(stderr, "walk: stat: %s: %r\n", fid.path);
		e := sprint("%r");
		maybebroken(e);
	}
	if (ec >= 0){	# walk worked. invent everything else.
		for (i = 0; i < nqids - 1; i++){
			q := getqid(wpaths[i]);
			if (q == NOQID)
				q = addqid(wpaths[i]);
			wqids[i] = Qid(q, 0, QTDIR);
		}
		fid.qid = wqids[nqids-1] = fixqid(fid.path, d.qid);
		return ref Rmsg.Walk(m.tag, wqids);
	}
	# Couldn't walk. restore fid and respond
	fid.qid = oqid;
	fid.path = opath;
	for (i = 0; i < nqids; i++){
		(ec, d) = stat(wpaths[i]);
		if (ec < 0){
			e := sprint("%r");
			maybebroken(e);
			if (i == 0)
				return ref Rmsg.Error(m.tag, "file does not exist");
			else
				return ref Rmsg.Walk(m.tag, wqids[0:i]);
		}
		wqids[i] = fixqid(wpaths[i], d.qid);
	}
	panic("can or can't walk?"); 
	return nil;
}

reqrdproc(fd: ref FD, reqc: chan of ref Tmsg)
{
	pctl(Sys->NEWFD, 0::1::2::fd.fd :: nil);
	m: ref Tmsg;
	do {
		m = Tmsg.read(fd, msize);
		if (m != nil)
			pick mm := m {
			Readerror =>
				fprint(stderr, "mux: %s\n", mm.error);
				m = nil;
			}
		reqc <-= m;
	} while(m != nil);
}

fsreq(req: ref Tmsg) : ref Rmsg
{
	pick m := req {
	Version =>	return fsversion(m);
	Auth =>		return ref Rmsg.Error(m.tag, "no auth required");
	Attach =>		return fsattach(m);
	Flush =>		return ref Rmsg.Flush(m.tag);
	Walk =>		return fswalk(m);
	Open =>		return fsopen(m);
	Create =>		return fscreate(m);
	Read =>		return fsread(m);
	Write =>		return fswrite(m);
	Clunk =>		return fsclunk(m);
	Stat =>		return fsstat(m);
	Remove =>	return fsremove(m);
	Wstat =>		return fswstat(m);
	* =>			panic("bug"); return nil;
	}
}

canretry(req: ref Tmsg): int
{
	r := tagof(req);
	if (r == tagof(Tmsg.Auth) || r == tagof(Tmsg.Read) || r == tagof(Tmsg.Write))
		return 0;
	return 1;
}
	

srvproc(reqc: chan of ref Tmsg, cfd: ref FD)
{
	for(;;){
		req := <-reqc;
		if (req == nil)
			break;
		if (debug)
			fprint(stderr, "<- %s\n", req.text());
		rebind();
		wasbroken := brokenfs;
		rep := fsreq(req);
		if (rep == nil)
			error("nil reply");
		if (!wasbroken && brokenfs){
			# BUG: Here, if we were not lucky and this thing did break,
			# we might call rebind and retry the rpc again, if we see that
			# there's no problem in doing so.
			if (canretry(req)){
				rebind();
				if (!brokenfs)
					rep = fsreq(req);
			}
		}
		if (debug)
			fprint(stderr, "-> %s\n", rep.text());
		b := rep.pack();
		nw := write(cfd, b, len b);
		if (nw != len b){
			fprint(stderr, "write: %r\n");
			break;
		}
	}
	terminate();
}

mux(pidc: chan of int, fd: ref FD)
{
	if (pidc != nil)
		pidc <-= pctl(FORKNS|NEWPGRP|NEWFD, list of {0,1,2,fd.fd});
	else
		pctl(NEWPGRP, nil);
	reqc := chan of ref Tmsg;
	bindrootdir();
	spawn reqrdproc(fd, reqc);
	spawn srvproc(reqc, fd);
}

init(nil: ref Draw->Context, args: list of string)
{
	sys = load Sys Sys->PATH;
	err = load Error Error->PATH;
	err->init();
	styx = checkload(load Styx Styx->PATH, Styx->PATH);
	names = checkload(load Names Names->PATH, Names->PATH); 
	dat = checkload(load Muxdat Muxdat->PATH, Muxdat->PATH);
	arg = checkload(load Arg Arg->PATH, Arg->PATH);
	arg->init(args);
	arg->setusage("mux [-abcd] [-m mnt] name val [name val]...");
	mnt: string;
	flag := MREPL|MCREATE;
	while((opt := arg->opt()) != 0) {
		case opt{
		'b' =>
			flag = MBEFORE;
		'a' =>
			flag = MAFTER;
		'c' =>
			flag |= MCREATE;
		'm' =>
			mnt = arg->earg();
		'd' =>
			debug = 1;
		* =>
			usage();
		}
	}
	args = arg->argv();
	argc := len args;
	if (argc == 0 || (argc%2) != 0)
		usage();
	styx->init();
	dat->init(sys, err, names, args);
	if (mnt == nil)
		mux(nil, fildes(0));
	else {
		pfds := array[2] of ref FD;
		if (pipe(pfds) < 0)
			error(sprint("mux: pipe: %r"));
		pidc := chan of int;
		spawn mux(pidc, pfds[0]);
		<-pidc;
		if (mount(pfds[1], nil, mnt, flag, nil) < 0)
			error(sprint("mux: mount: %r"));
		pfds[0] = nil;
	}
}


Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to webmaster@9p.io.