Plan 9 from Bell Labs’s /usr/web/sources/contrib/blstuart/θfs/cache.c

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


/*
 * Copyright (c) 2013, Coraid, Inc.
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *     * Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *     * Redistributions in binary form must reproduce the above copyright
 *       notice, this list of conditions and the following disclaimer in the
 *       documentation and/or other materials provided with the distribution.
 *     * Neither the name of Coraid nor the
 *       names of its contributors may be used to endorse or promote products
 *       derived from this software without specific prior written permission.
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL CORAID BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include <u.h>
#include <libc.h>
#include <thread.h>
#include <fcall.h>
#include <9p.h>
#include "dat.h"

enum {
	CDirty = 1,
	CFlushing = 2,
	CFree = 4,

	CBrelease = 1,
	CBclean,
	CBread,
	CBwrite,
	CCanfree,
	CReset,

	Ncht = 4001,

	Nreaders = 4,

	Ridle = 0,
	Rloading = 1,
};

typedef struct CBlock CBlock;
typedef struct Cachereq Cachereq;
typedef struct Cacheresp Cacheresp;
typedef struct Reader Reader;

struct CBlock {
	Ref ref;
	uvlong blkno;
	uchar buf[BlkSize];
	int flags;
	CBlock *next, *prev;
	CBlock *htnext, *htprev;
	CBlock *wnext, *wprev;
};

struct Cachereq {
	int req;
	uvlong blk;
	Channel *resp;
};

struct Cacheresp {
	int res;
	void *p;
};

struct Reader {
	char *dev;
	Channel *rdchan;
	int state;
	uvlong loading;
};

static int mypid;
static Channel *wbtrigger;
static CBlock *ht[Ncht];
static CBlock *chd, *ctl;
static CBlock *whd, *wtl;
static CBlock *freehd;
static int maxcache;
static Ref ncache;
static Ref ndirty;
static Ref nwlist;
static int timertid;
static uvlong nmiss;
static uvlong nread;
static uvlong nwrite;
static ulong hrate;
static int syncing;
static Reader rds[Nreaders];
static Lock calock;

Channel *cachechan;

/*
 * Because all the allocs and frees are done in threads of the
 * same process, we shouldn't need any locks
 */
static CBlock *
cballoc(void)
{
	CBlock *p;

	if(freehd == nil)
		return θmalloc(sizeof(CBlock));
	lock(&calock);
	p = freehd;
	freehd = p->next;
	if(!(p->flags & CFree))
		fprint(2, "Internal error: non-free block on free list\n");
	p->flags &= ~CFree;
	p->next = nil;
	unlock(&calock);
	return p;
}

static void
cbfree(CBlock *p)
{
	if(p->flags & CFree)
		fprint(2, "Freeing already free block?!?!?\n");
	else if(p->ref.ref != 0)
		fprint(2, "Freeing in use block\n");
	else if(p->next || p->prev || p->htnext || p->htprev || p->wnext || p->wprev)
		fprint(2, "Freeing block in data structures\n");
	else {
		lock(&calock);
		p->flags |= CFree;
		p->next = freehd;
		freehd = p;
		unlock(&calock);
	}
}

static CBlock *
lookup(uvlong blk)
{
	CBlock *p;
	int idx;

	idx = blk % Ncht;
	for(p = ht[idx]; p && p->blkno != blk; p = p->htnext) ;
	return p;
}

static void
updatestats(int hit)
{
	if(!hit) {
		++nmiss;
		hrate = (999 * hrate + 500) / 1000;
	}
	else
		hrate = (999 * hrate + 500) / 1000 + 1000;
}

static void
insht(CBlock *p)
{
	int idx;

	idx = p->blkno % Ncht;
	if(ht[idx])
		ht[idx]->htprev = p;
	p->htnext = ht[idx];
	ht[idx] = p;
}

static void
rmht(CBlock *p)
{
	CBlock *nxt, *prv;
	int idx;

	idx = p->blkno % Ncht;
	nxt = p->htnext;
	prv = p->htprev;
	if(nxt)
		nxt->htprev = prv;
	if(prv)
		prv->htnext = nxt;
	if(ht[idx] == p)
		ht[idx] = nxt;
	p->htnext = nil;
	p->htprev = nil;
}

static void
inslru(CBlock *p)
{
	if(chd == nil)
		chd = p;
	else
		ctl->next = p;
	p->prev = ctl;
	p->next = nil;
	ctl = p;
	incref(&ncache);
}

static void
rmlru(CBlock *p)
{
	if(p->next)
		p->next->prev = p->prev;
	else
		ctl = p->prev;
	if(p->prev)
		p->prev->next = p->next;
	else
		chd = p->next;
	p->next = nil;
	p->prev = nil;
	decref(&ncache);
}

static void
insw(CBlock *p)
{
	if(whd == nil)
		whd = p;
	else
		wtl->wnext = p;
	p->wprev = wtl;
	p->wnext = nil;
	wtl = p;
	incref(&nwlist);
}

static void
rmw(CBlock *p)
{
	if(p->wnext)
		p->wnext->wprev = p->wprev;
	else
		wtl = p->wprev;
	if(p->wprev)
		p->wprev->wnext = p->wnext;
	else
		whd = p->wnext;
	p->wnext = nil;
	p->wprev = nil;
	decref(&nwlist);
}

static void
mvlru(CBlock *p)
{
	if(p != ctl) {
		rmlru(p);
		inslru(p);
	}
}

static void
dolru(void)
{
	CBlock *p;
	int i;

	if(ncache.ref < maxcache)
		return;
	for(p = chd, i = 0; p && (p->ref.ref > 0 || (p->flags & (CDirty | CFlushing))); p = p->next, ++i)
		if(i > ncache.ref) {
			fprint(2, "cycle in LRU list? n:%ld d:%ld\n", ncache.ref, ndirty.ref);
			chd->prev = nil;
			ctl->next = nil;
			return;
		}
	if(p) {
		rmht(p);
		rmlru(p);
		if(p->flags & CDirty) {
			decref(&ndirty);
			p->flags &= ~CDirty;
			rmw(p);
		}
		cbfree(p);
	}
}

static long
_iopread(va_list *arg)
{
	void *a;
	vlong off;
	long n;
	int fd;

	fd = va_arg(*arg, int);
	a = va_arg(*arg, void*);
	n = va_arg(*arg, long);
	off = va_arg(*arg, vlong);
	return pread(fd, a, n, off);
}

static long
_iopwrite(va_list *arg)
{
	void *a;
	vlong off;
	long n;
	int fd;

	fd = va_arg(*arg, int);
	a = va_arg(*arg, void*);
	n = va_arg(*arg, long);
	off = va_arg(*arg, vlong);
	return pwrite(fd, a, n, off);
}

static void
wbtimer(void *)
{
	while(!shutdown) {
		sleep(15000);
		if(syncing != 1)
			sendul(wbtrigger, 1);
	}
	sendul(wbtrigger, 1);
}

static void
wbthread(void *d)
{
	Ioproc *wbio;
	CBlock *p;
	char *dev;
	int fd;

	dev = d;
	wbio = ioproc();
	fd = ioopen(wbio, dev, ORDWR);
	if(fd < 0)
		sysfatal("wb open: %r");
	while(!shutdown) {
		recvul(wbtrigger);
		syncing = 1;
		do {
			for(p = whd; p && p->ref.ref > 0 && p->blkno >= super.firstdat; p = p->wnext) ;
			if(p) {
				p->flags |= CFlushing;
				p->flags &= ~CDirty;
				decref(&ndirty);
				rmw(p);
				++nwrite;
				iocall(wbio, _iopwrite, fd, p->buf, BlkSize, p->blkno * BlkSize);
				p->flags &= ~CFlushing;
			}
		} while(p);
		syncing = 0;
	}
	ioclose(wbio, fd);
	closeioproc(wbio);
}

static int
_brelease(uvlong blk)
{
	CBlock *p;
	int rv;

	rv = 0;
	p = lookup(blk);
	if(p) {
		if(p->ref.ref == 0) {
			fprint(2, "trying to decrement below 0: blk %ulld\n", blk);
			rv = -1;
		}
		else
			decref(&p->ref);
	}
	else
		rv = -1;
	dolru();
	return rv;
}

static void *
_cbclean(uvlong blk)
{
	CBlock *p;

	p = lookup(blk);
	if(p) {
		memset(p->buf, 0, BlkSize);
		mvlru(p);
		incref(&p->ref);
		updatestats(1);
		return p->buf;
	}
	updatestats(0);
	dolru();
	p = cballoc();
	memset(p->buf, 0, BlkSize);
	p->blkno = blk;
	incref(&p->ref);
	insht(p);
	inslru(p);
	return p->buf;
}

static void
reader(void *a)
{
	Cachereq r;
	Cacheresp rsp;
	Ioproc *cio;
	Reader *rp;
	CBlock *p;
	int cfd, i;

	rp = a;
	cio = ioproc();
	cfd = ioopen(cio, rp->dev, ORDWR);
	if(cfd < 0)
		sysfatal("Couldn't open device: %r");
	while(1) {
		if(recv(rp->rdchan, &r) == 0) {
			if(shutdown) {
				closeioproc(cio);
				threadexits(nil);
			}
			continue;
		}
		/*
		 * See if it got loaded while it was in the channel queue
		 */
		p = lookup(r.blk);
		if(p) {
			mvlru(p);
			incref(&p->ref);
			updatestats(1);
			rsp.p = p->buf;
			send(r.resp, &rsp);
			continue;
		}
		/*
		 * If another reader is already loading this block, pass off the request
		 * to that reader.  That way, by the time this request gets looked at
		 * again, the block will already be loaded.
		 */
		for(i = 0; i < Nreaders && (rds[i].state != Rloading || rds[i].loading != r.blk); ++i) ;
		if(i < Nreaders) {
			send(rds[i].rdchan, &r);
			continue;
		}
		rp->state = Rloading;
		rp->loading = r.blk;
		dolru();
		p = cballoc();
		p->blkno = r.blk;
		incref(&p->ref);
		++nread;
		if(iocall(cio, _iopread, cfd, p->buf, BlkSize, r.blk * BlkSize) != BlkSize) {
			rp->state = Ridle;
			cbfree(p);
			rsp.p = nil;
			send(r.resp, &rsp);
			continue;
		}
		insht(p);
		inslru(p);
		rp->state = Ridle;
		rsp.p = p->buf;
		send(r.resp, &rsp);
	}
}	

static void
_cbread(Cachereq *r)
{
	Cacheresp rsp;
	CBlock *p;
	static int rr;

	p = lookup(r->blk);
	if(p) {
		mvlru(p);
		incref(&p->ref);
		updatestats(1);
		rsp.p = p->buf;
		send(r->resp, &rsp);
		return;
	}
	updatestats(0);
	send(rds[rr].rdchan, r);
	++rr;
	if(rr >= Nreaders)
		rr = 0;
}

static void
_cbwrite(uvlong blk)
{
	CBlock *p;

	p = lookup(blk);
	if(p) {
		mvlru(p);
		if(!(p->flags & CDirty)) {
			p->flags |= CDirty;
			incref(&ndirty);
			insw(p);
		}
	}
	if(ndirty.ref > ncache.ref / 10 && !syncing)
		nbsendul(wbtrigger, 1);
}

static int
_ccanfree(uvlong blk)
{
	CBlock *p;

	p = lookup(blk);
	if(p) {
		if(p->ref.ref > 0 /* || (p->flags & (CDirty | CFlushing)) */ ) {
			fprint(2, "Wanting to free block %ulld with ref %ld and flags %x\n", blk, p->ref.ref, p->flags);
			return 0;
		}
		if(p->flags & CDirty) {
			decref(&ndirty);
			rmw(p);
			p->flags &= ~CDirty;
		}
		rmht(p);
		rmlru(p);
		cbfree(p);
	}
	return 1;
}

static void
_resetcache(void)
{
	CBlock *p;

	while(1) {
		for(p = chd; p && p->ref.ref > 0; p = p->next) ;
		if(p == nil)
			break;
		rmht(p);
		rmlru(p);
		cbfree(p);
	}
	if(chd)
		fprint(2, "warning: active blocks during reset\n");
}

static void
handler(void *)
{
	Cacheresp rsp;
	Cachereq r;

	mypid = threadpid(threadid());
	while(1) {
		if(recv(cachechan, &r) == 0) {
			if(shutdown)
				threadexits(nil);
			continue;
		}
		switch(r.req) {
		case CBrelease:
			rsp.res = _brelease(r.blk);
			if(r.resp)
				send(r.resp, &rsp);
			break;
		case CBclean:
			rsp.p = _cbclean(r.blk);
			send(r.resp, &rsp);
			break;
		case CBread:
			_cbread(&r);
			break;
		case CBwrite:
			_cbwrite(r.blk);
			send(r.resp, &rsp);
			break;
		case CCanfree:
			rsp.res = _ccanfree(r.blk);
			send(r.resp, &rsp);
			break;
		case CReset:
			_resetcache();
			send(r.resp, &rsp);
			break;
		}
	}
}

void
initcache(char *dev, int m)
{
	int i;

	maxcache = m;
	for(i = 0; i < Nreaders; ++i) {
		rds[i].dev = dev;
		rds[i].rdchan = chancreate(sizeof(Cachereq), 10);
		threadcreate(reader, &rds[i], 8192);
	}
	cachechan = chancreate(sizeof(Cachereq), 2);
	threadcreate(handler, nil, 8192);
	wbtrigger = chancreate(sizeof(ulong), 2);
	threadcreate(wbthread, dev, 8192);
	timertid = proccreate(wbtimer, nil, 1024);
}

void
haltcache(void)
{
	int i;

	for(i = 0; i < Nreaders; ++i)
		chanclose(rds[i].rdchan);
	chanclose(cachechan);
	threadkill(timertid);
	sendul(wbtrigger, 1);
	for(i = 0; i < 30 && whd; ++i) {
		fprint(2, ".");
		sleep(1000);
	}
}

int
brelease(uvlong blk)
{
	Cachereq r;

	if(mypid == threadpid(threadid()))
//		return _brelease(blk);
{
int n;
n=_brelease(blk);
if(n==-1) fprint(2, "brelease error called from %p\n", getcallerpc(&blk));
return n;
}
	r.req = CBrelease;
	r.blk = blk;
	r.resp = nil;
	send(cachechan, &r);
	return 0;
}

void *
cbclean(uvlong blk)
{
	Cachereq r;
	Cacheresp rsp;

	if(mypid == threadpid(threadid()))
		return _cbclean(blk);
	r.req = CBclean;
	r.blk = blk;
	r.resp = chancreate(sizeof(Cacheresp), 0);
	send(cachechan, &r);
	recv(r.resp, &rsp);
	chanfree(r.resp);
	return rsp.p;
}

void *
cbread(uvlong blk)
{
	Cachereq r;
	Cacheresp rsp;
	CBlock *p;

	if(mypid == threadpid(threadid())) {
		p = lookup(blk);
		if(p) {
			mvlru(p);
			incref(&p->ref);
			updatestats(1);
			return p->buf;
		}
	}
	r.req = CBread;
	r.blk = blk;
	r.resp = chancreate(sizeof(Cacheresp), 0);
	send(cachechan, &r);
	recv(r.resp, &rsp);
	chanfree(r.resp);
	return rsp.p;
}

void
cbwrite(uvlong blk)
{
	Cachereq r;
	Cacheresp rsp;

	if(mypid == threadpid(threadid())) {
		_cbwrite(blk);
		return;
	}
	r.req = CBwrite;
	r.blk = blk;
	r.resp = chancreate(sizeof(Cacheresp), 0);
	send(cachechan, &r);
	recv(r.resp, &rsp);
	chanfree(r.resp);
}

int
ccanfree(uvlong blk)
{
	Cachereq r;
	Cacheresp rsp;

	if(mypid == threadpid(threadid()))
		return _ccanfree(blk);
	r.req = CCanfree;
	r.blk = blk;
	r.resp = chancreate(sizeof(Cacheresp), 0);
	send(cachechan, &r);
	recv(r.resp, &rsp);
	chanfree(r.resp);
	return rsp.res;
}

int
cread(void *a, int n, uvlong off)
{
	uchar *p;
	uvlong blk;
	ulong boff;

	blk = off / BlkSize;
	boff = off % BlkSize;
	if(boff + n > BlkSize) {
		fprint(2, "invalid block crossing\n");
		return -1;
	}
	p = cbread(blk);
	if(p == nil)
		return -1;
	memmove(a, p + boff, n);
	brelease(blk);
	return n;
}

int
cwrite(void *a, int n, uvlong off)
{
	uchar *p;
	uvlong blk;
	ulong boff;

	blk = off / BlkSize;
	if(blk == 0)
		return -1;
	boff = off % BlkSize;
	if(boff + n > BlkSize) {
		fprint(2, "invalid block crossing\n");
		return -1;
	}
	p = cbread(blk);
	if(p == nil)
		return -1;
	memmove(p + boff, a, n);
	cbwrite(blk);
	brelease(blk);
	return n;
}

void
csync(void)
{
	syncing = 2;
	threadint(timertid);
	while(syncing != 0)
		yield();
}

static char cstatbuf[1024];

char  *
prcstat(void)
{
	CBlock *cb;
	char *p, *e;
	int ldirty, i, nhash;
	int refhist[10];
int saidit = 0;

	ldirty = 0;
	p = cstatbuf;
	e = p + nelem(cstatbuf);
	memset(refhist, 0, 10 * sizeof(int));
	p = seprint(p, e, "Cache stats:\n");
	p = seprint(p, e, "ncache: %ld\n", ncache.ref);
	p = seprint(p, e, "nwlist: %ld\n", nwlist.ref);
	p = seprint(p, e, "ndirty: %ld\n", ndirty.ref);
	for(cb = chd; cb; cb = cb->next) {
		if(cb->flags & CDirty)
{
if(!saidit) {p = seprint(p, e, "dirty block ref:%ld blk:%ulld\n", cb->ref.ref, cb->blkno); ++saidit;}
			++ldirty;
}
		if(cb->ref.ref < 0) {
			p = seprint(p, e, "bad ref count: %ld on block %ulld; setting to 0\n", cb->ref.ref, cb->blkno);
			cb->ref.ref = 0;
		}
		else if(cb->ref.ref >= 9)
			++refhist[9];
		else
			++refhist[cb->ref.ref];
		if(cb->ref.ref > 0)
			p = seprint(p, e, "In use block: %ulld flags %ux\n", cb->blkno, cb->flags);
	}
	nhash = 0;
	for(i = 0; i < Ncht; ++i) {
		for(cb = ht[i]; cb; cb = cb->htnext)
			++nhash;
	}
	p = seprint(p, e, "nhash: %d\n", nhash);
	p = seprint(p, e, "ldirty: %d\n", ldirty);
	p = seprint(p, e, "nread: %ulld\n", nread);
	p = seprint(p, e, "nwrite: %ulld\n", nwrite);
	p = seprint(p, e, "nmiss: %ulld\n", nmiss);
	p = seprint(p, e, "hit rate: %uld%%\n", (hrate + 5000) / 10000);
	p = seprint(p, e, "ref count histogram:\n");
	p = seprint(p, e, "   0    1    2    3    4    5    6    7    8   >8\n");
	for(i = 0; i < 10; ++i)
		p = seprint(p, e, "%4d ", refhist[i]);
	seprint(p, e, "\n");
	return cstatbuf;
}

void
resetcache(void)
{
	Cachereq r;
	Cacheresp rsp;

	if(mypid == threadpid(threadid())) {
		_resetcache();
		return;
	}
	r.req = CReset;
	r.resp = chancreate(sizeof(Cacheresp), 0);
	send(cachechan, &r);
	recv(r.resp, &rsp);
	chanfree(r.resp);
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to webmaster@9p.io.