Plan 9 from Bell Labs’s /usr/web/sources/patch/applied/nventi/buildindex.c.new

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


/*
 * Rebuild the index from scratch, in place.
 */
#include "stdinc.h"
#include "dat.h"
#include "fns.h"

enum
{
	MinBufSize = 64*1024,
	MaxBufSize = 4*1024*1024,
};

int		dumb;
int		errors;
char		**isect;
int		nisect;
int		bloom;
int		zero;

u32int	isectmem;
u64int	totalbuckets;
u64int	totalclumps;
Channel	*arenadonechan;
Channel	*isectdonechan;
Index	*ix;

u64int	arenaentries;
u64int	skipentries;
u64int	indexentries;

static int shouldprocess(ISect*);
static void	isectproc(void*);
static void	arenapartproc(void*);

void
usage(void)
{
	fprint(2, "usage: buildindex [-b] [-i isect]... [-M imem] venti.conf\n");
	threadexitsall("usage");
}

void
threadmain(int argc, char *argv[])
{
	int fd, i, napart, nfinish, maxdisks;
	u32int bcmem, imem;
	Config conf;
	Part *p;
	
	maxdisks = 100000;
	ventifmtinstall();
	imem = 256*1024*1024;
	ARGBEGIN{
	case 'b':
		bloom = 1;
		break;
	case 'd':	/* debugging - make sure to run all 3 passes */
		dumb = 1;
		break;
	case 'i':
		isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
		isect[nisect++] = EARGF(usage());
		break;
	case 'M':
		imem = unittoull(EARGF(usage()));
		break;
	case 'm':	/* temporary - might go away */
		maxdisks = atoi(EARGF(usage()));
		break;
	default:
		usage();
		break;
	}ARGEND

	if(argc != 1)
		usage();

	if(initventi(argv[0], &conf) < 0)
		sysfatal("can't init venti: %r");
	ix = mainindex;
	if(nisect == 0 && ix->bloom)
		bloom = 1;
	if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
		sysfatal("loadbloom: %r");
	if(bloom && !ix->bloom)
		sysfatal("-b specified but no bloom filter");
	if(!bloom)
		ix->bloom = nil;
	isectmem = imem/ix->nsects;

	/*
	 * safety first - only need read access to arenas
	 */
	p = nil;
	for(i=0; i<ix->narenas; i++){
		if(ix->arenas[i]->part != p){
			p = ix->arenas[i]->part;
			if((fd = open(p->filename, OREAD)) < 0)
				sysfatal("cannot reopen %s: %r", p->filename);
			dup(fd, p->fd);
			close(fd);
		}
	}
	
	/*
	 * need a block for every arena
	 */
	bcmem = maxblocksize * (mainindex->narenas + 16);
	if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
	initdcache(bcmem);
	
	totalclumps = 0;
	for(i=0; i<ix->narenas; i++)
		totalclumps += ix->arenas[i]->diskstats.clumps;
	
	totalbuckets = 0;
	for(i=0; i<ix->nsects; i++)
		totalbuckets += ix->sects[i]->blocks;
	fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);

	/* start index procs */
	fprint(2, "%T read index\n");
	isectdonechan = chancreate(sizeof(void*), 0);
	for(i=0; i<ix->nsects; i++){
		if(shouldprocess(ix->sects[i])){
			ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0);
			vtproc(isectproc, ix->sects[i]);
		}
	}
	
	for(i=0; i<nisect; i++)
		if(isect[i])
			fprint(2, "warning: did not find index section %s\n", isect[i]);

	/* start arena procs */
	p = nil;
	napart = 0;
	nfinish = 0;
	arenadonechan = chancreate(sizeof(void*), 0);
	for(i=0; i<ix->narenas; i++){
		if(ix->arenas[i]->part != p){
			p = ix->arenas[i]->part;
			vtproc(arenapartproc, p);
			if(++napart >= maxdisks){
				recvp(arenadonechan);
				nfinish++;
			}
		}
	}

	/* wait for arena procs to finish */
	for(nfinish=0; nfinish<napart; nfinish++)
		recvp(arenadonechan);

	/* tell index procs to finish */
	for(i=0; i<ix->nsects; i++)
		if(ix->sects[i]->writechan)
			send(ix->sects[i]->writechan, nil);

	/* wait for index procs to finish */
	for(i=0; i<ix->nsects; i++)
		if(ix->sects[i]->writechan)
			recvp(isectdonechan);

	if(ix->bloom && writebloom(ix->bloom) < 0)
		fprint(2, "writing bloom filter: %r\n");

	fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n", 
		arenaentries, indexentries, skipentries);
	threadexitsall(nil);
}

static int
shouldprocess(ISect *is)
{
	int i;
	
	if(nisect == 0)
		return 1;

	for(i=0; i<nisect; i++)
		if(isect[i] && strcmp(isect[i], is->name) == 0){
			isect[i] = nil;
			return 1;
		}
	return 0;
}

static void
add(u64int *a, u64int n)
{
	static Lock l;
	
	lock(&l);
	*a += n;
	unlock(&l);
}

/*
 * Read through an arena partition and send each of its IEntries
 * to the appropriate index section.  When finished, send on
 * arenadonechan.
 */
enum
{
	ClumpChunks = 32*1024,
};
static void
arenapartproc(void *v)
{
	int i, j, n, nskip, x;
	u32int clump;
	u64int addr, tot;
	Arena *a;
	ClumpInfo *ci, *cis;
	IEntry ie;
	Part *p;
	
	p = v;
	threadsetname("arenaproc %s", p->name);

	nskip = 0;
	tot = 0;
	cis = MKN(ClumpInfo, ClumpChunks);
	for(i=0; i<ix->narenas; i++){
		a = ix->arenas[i];
		if(a->part != p)
			continue;
		if(a->memstats.clumps)
			fprint(2, "%T arena %s: %d entries\n", 
				a->name, a->memstats.clumps);
		/*
		 * Running the loop backwards accesses the 
		 * clump info blocks forwards, since they are
		 * stored in reverse order at the end of the arena.
		 * This speeds things slightly.
		 */
		addr = ix->amap[i].start + a->memstats.used;
		for(clump=a->memstats.clumps; clump > 0; clump-=n){
			n = ClumpChunks;
			if(n > clump)
				n = clump;
			if(readclumpinfos(a, clump-n, cis, n) != n){
				fprint(2, "%T arena %s: directory read: %r\n", a->name);
				errors = 1;
				break;
			}
			for(j=n-1; j>=0; j--){
				ci = &cis[j];
				ie.ia.type = ci->type;
				ie.ia.size = ci->uncsize;
				addr -= ci->size + ClumpSize;
				ie.ia.addr = addr;
				ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
				scorecp(ie.score, ci->score);
				if(ci->type == VtCorruptType)
					nskip++;
				else{
					tot++;
					x = indexsect(ix, ie.score);
					assert(0 <= x && x < ix->nsects);
					if(ix->sects[x]->writechan)
						send(ix->sects[x]->writechan, &ie);
					if(ix->bloom)
						markbloomfilter(ix->bloom, ie.score);
				}
			}
		}
		if(addr != ix->amap[i].start)
			fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
	}
	add(&arenaentries, tot);
	add(&skipentries, nskip);
	sendp(arenadonechan, p);
}

/*
 * Convert score into relative bucket number in isect.
 * Can pass a packed ientry instead of score - score is first.
 */
static u32int
score2bucket(ISect *is, uchar *score)
{
	u32int b;
	
	b = hashbits(score, 32)/ix->div;
	if(b < is->start || b >= is->stop){
		fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
			score, ix->div, b, is->start, is->stop);
	}
	assert(is->start <= b && b < is->stop);
	return b - is->start;
}

/*
 * Convert offset in index section to bucket number.
 */
static u32int
offset2bucket(ISect *is, u64int offset)
{
	u32int b;
	
	assert(is->blockbase <= offset);
	offset -= is->blockbase;
	b = offset/is->blocksize;
	assert(b < is->stop-is->start);
	return b;
}

/*
 * Convert bucket number to offset.
 */
static u64int
bucket2offset(ISect *is, u32int b)
{
	assert(b <= is->stop-is->start);
	return is->blockbase + (u64int)b*is->blocksize;
}

/* 
 * IEntry buffers to hold initial round of spraying.
 */
typedef struct Buf Buf;
struct Buf
{
	Part *part;			/* partition being written */
	uchar *bp;		/* current block */
	uchar *ep;		/* end of block */
	uchar *wp;		/* write position in block */
	u64int boffset;		/* start offset */
	u64int woffset;		/* next write offset */
	u64int eoffset;		/* end offset */
	u32int nentry;		/* number of entries written */
};

static void
bflush(Buf *buf)
{
	u32int bufsize;
	
	if(buf->woffset >= buf->eoffset)
		sysfatal("buf index chunk overflow - need bigger index");
	bufsize = buf->ep - buf->bp;
	if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
		fprint(2, "write %s: %r\n", buf->part->name);
		errors = 1;
	}
	buf->woffset += bufsize;
	memset(buf->bp, 0, bufsize);
	buf->wp = buf->bp;
}

static void
bwrite(Buf *buf, IEntry *ie)
{
	if(buf->wp+IEntrySize > buf->ep)
		bflush(buf);
	assert(buf->bp <= buf->wp && buf->wp < buf->ep);
	packientry(ie, buf->wp);
	buf->wp += IEntrySize;
	assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
	buf->nentry++;
}

/*
 * Minibuffer.  In-memory data structure holds our place
 * in the buffer but has no block data.  We are writing and
 * reading the minibuffers at the same time.  (Careful!)
 */
typedef struct Minibuf Minibuf;
struct Minibuf
{
	u64int boffset;		/* start offset */
	u64int roffset;		/* read offset */
	u64int woffset;		/* write offset */
	u64int eoffset;		/* end offset */
	u32int nentry;		/* # entries left to read */
	u32int nwentry;	/* # entries written */
};

/*
 * Index entry pool.  Used when trying to shuffle around 
 * the entries in a big buffer into the corresponding M minibuffers.
 * Sized to hold M*EntriesPerBlock entries, so that there will always
 * either be room in the pool for another block worth of entries
 * or there will be an entire block worth of sorted entries to 
 * write out.
 */
typedef struct IEntryLink IEntryLink;
typedef struct IPool IPool;

struct IEntryLink
{
	uchar ie[IEntrySize];		/* raw IEntry */
	IEntryLink *next;		/* next in chain */
};

struct IPool
{
	ISect *isect;
	u32int buck0;			/* first bucket in pool */
	u32int mbufbuckets;	/* buckets per minibuf */
	IEntryLink *entry;		/* all IEntryLinks */
	u32int nentry;			/* # of IEntryLinks */
	IEntryLink *free;		/* free list */
	u32int nfree;			/* # on free list */
	Minibuf *mbuf;			/* all minibufs */
	u32int nmbuf;			/* # of minibufs */
	IEntryLink **mlist;		/* lists for each minibuf */
	u32int *mcount;		/* # on each mlist[i] */
	u32int bufsize;			/* block buffer size */
	uchar *rbuf;			/* read buffer */
	uchar *wbuf;			/* write buffer */
	u32int epbuf;			/* entries per block buffer */
};

/*
static int
countsokay(IPool *p)
{
	int i;
	u64int n;
	
	n = 0;
	for(i=0; i<p->nmbuf; i++)
		n += p->mcount[i];
	n += p->nfree;
	if(n != p->nentry){
		print("free %ud:", p->nfree);
		for(i=0; i<p->nmbuf; i++)
			print(" %ud", p->mcount[i]);
		print(" = %lld nentry: %ud\n", n, p->nentry);
	}
	return n == p->nentry;
}
*/

static IPool*
mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf, 
	u32int mbufbuckets, u32int bufsize)
{
	u32int i, nentry;
	uchar *data;
	IPool *p;
	IEntryLink *l;
	
	nentry = (nmbuf+1)*bufsize / IEntrySize;
	p = ezmalloc(sizeof(IPool)
		+nentry*sizeof(IEntry)
		+nmbuf*sizeof(IEntryLink*)
		+nmbuf*sizeof(u32int)
		+3*bufsize);
	
	p->isect = isect;
	p->mbufbuckets = mbufbuckets;
	p->bufsize = bufsize;
	p->entry = (IEntryLink*)(p+1);
	p->nentry = nentry;
	p->mlist = (IEntryLink**)(p->entry+nentry);
	p->mcount = (u32int*)(p->mlist+nmbuf);
	p->nmbuf = nmbuf;
	p->mbuf = mbuf;
	data = (uchar*)(p->mcount+nmbuf);
	data += bufsize - (uintptr)data%bufsize;
	p->rbuf = data;
	p->wbuf = data+bufsize;
	p->epbuf = bufsize/IEntrySize;

	for(i=0; i<p->nentry; i++){
		l = &p->entry[i];
		l->next = p->free;
		p->free = l;
		p->nfree++;
	}
	return p;
}

/* 
 * Add the index entry ie to the pool p.
 * Caller must know there is room.
 */
static void
ipoolinsert(IPool *p, uchar *ie)
{
	u32int buck, x;
	IEntryLink *l;

	assert(p->free != nil);

	buck = score2bucket(p->isect, ie);
	x = (buck-p->buck0) / p->mbufbuckets;
	if(x >= p->nmbuf){
		fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
			buck, p->mbufbuckets, x);
	}
	assert(x < p->nmbuf);

	l = p->free;
	p->free = l->next;
	p->nfree--;
	memmove(l->ie, ie, IEntrySize);
	l->next = p->mlist[x];
	p->mlist[x] = l;
	p->mcount[x]++;
}	

/*
 * Pull out a block containing as many
 * entries as possible for minibuffer x.
 */
static u32int
ipoolgetbuf(IPool *p, u32int x)
{
	uchar *bp, *ep, *wp;
	IEntryLink *l;
	u32int n;
	
	bp = p->wbuf;
	ep = p->wbuf + p->bufsize;
	n = 0;
	assert(x < p->nmbuf);
	for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
		l = p->mlist[x];
		p->mlist[x] = l->next;
		p->mcount[x]--;
		memmove(wp, l->ie, IEntrySize);
		l->next = p->free;
		p->free = l;
		p->nfree++;
		n++;
	}
	memset(wp, 0, ep-wp);
	return n;
}

/*
 * Read a block worth of entries from the minibuf
 * into the pool.  Caller must know there is room.
 */
static void
ipoolloadblock(IPool *p, Minibuf *mb)
{
	u32int i, n;
	
	assert(mb->nentry > 0);
	assert(mb->roffset >= mb->woffset);
	assert(mb->roffset < mb->eoffset);

	n = p->bufsize/IEntrySize;
	if(n > mb->nentry)
		n = mb->nentry;
	if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
		fprint(2, "readpart %s: %r\n", p->isect->part->name);
	else{
		for(i=0; i<n; i++)
			ipoolinsert(p, p->rbuf+i*IEntrySize);
	}
	mb->nentry -= n;
	mb->roffset += p->bufsize;
}

/*
 * Write out a block worth of entries to minibuffer x.
 * If necessary, pick up the data there before overwriting it.
 */
static void
ipoolflush0(IPool *pool, u32int x)
{
	u32int bufsize;
	Minibuf *mb;
	
	mb = pool->mbuf+x;
	bufsize = pool->bufsize;
	mb->nwentry += ipoolgetbuf(pool, x);
	if(mb->nentry > 0 && mb->roffset == mb->woffset){
		assert(pool->nfree >= pool->bufsize/IEntrySize);
		/*
		 * There will be room in the pool -- we just 
		 * removed a block worth.
		 */
		ipoolloadblock(pool, mb);
	}
	if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
		fprint(2, "writepart %s: %r\n", pool->isect->part->name);
	mb->woffset += bufsize;
}

/*
 * Write out some full block of entries.
 * (There must be one -- the pool is almost full!)
 */
static void
ipoolflush1(IPool *pool)
{
	u32int i;

	assert(pool->nfree <= pool->epbuf);

	for(i=0; i<pool->nmbuf; i++){
		if(pool->mcount[i] >= pool->epbuf){
			ipoolflush0(pool, i);
			return;
		}
	}
	/* can't be reached - someone must be full */
	sysfatal("ipoolflush1");
}

/*
 * Flush all the entries in the pool out to disk.
 * Nothing more to read from disk.
 */
static void
ipoolflush(IPool *pool)
{
	u32int i;
	
	for(i=0; i<pool->nmbuf; i++)
		while(pool->mlist[i])
			ipoolflush0(pool, i);
	assert(pool->nfree == pool->nentry);
}

/*
 * Third pass.  Pick up each minibuffer from disk into
 * memory and then write out the buckets.
 */

/*
 * Compare two packed index entries.  
 * Usual ordering except break ties by putting higher
 * index addresses first (assumes have duplicates
 * due to corruption in the lower addresses).
 */
static int
ientrycmpaddr(const void *va, const void *vb)
{
	int i;
	uchar *a, *b;
	
	a = (uchar*)va;
	b = (uchar*)vb;
	i = ientrycmp(a, b);
	if(i)
		return i;
	return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
}

static void
zerorange(Part *p, u64int o, u64int e)
{
	static uchar zero[MaxIoSize];
	u32int n;
	
	for(; o<e; o+=n){
		n = sizeof zero;
		if(o+n > e)
			n = e-o;
		if(writepart(p, o, zero, n) < 0)
			fprint(2, "writepart %s: %r\n", p->name);
	}
}

/*
 * Load a minibuffer into memory and write out the 
 * corresponding buckets.
 */
static void
sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
{
	uchar *buckdata, *p, *q, *ep;
	u32int b, lastb, memsize, n;
	u64int o;
	IBucket ib;
	Part *part;
	
	part = is->part;
	buckdata = emalloc(is->blocksize);
	
	if(mb->nwentry == 0)
		return;

	/*
	 * read entire buffer.
	 */
	assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
	assert(mb->woffset-mb->boffset <= nbuf);
	if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
		fprint(2, "readpart %s: %r\n", part->name);
		errors = 1;
		return;
	}
	assert(*(uint*)buf != 0xa5a5a5a5);
	
	/*
	 * remove fragmentation due to IEntrySize
	 * not evenly dividing Bufsize
	 */
	memsize = (bufsize/IEntrySize)*IEntrySize;
	for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
		memmove(p, q, memsize);
		p += memsize;
		q += bufsize;
	}
	ep = buf + mb->nwentry*IEntrySize;
	assert(ep <= buf+nbuf);

	/* 
	 * sort entries
	 */
	qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);

	/*
	 * write buckets out
	 */
	n = 0;
	lastb = offset2bucket(is, mb->boffset);
	for(p=buf; p<ep; p=q){
		b = score2bucket(is, p);
		for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
			;
		if(lastb+1 < b && zero)
			zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
		if(IBucketSize+(q-p) > is->blocksize)
			sysfatal("bucket overflow - make index bigger");
		memmove(buckdata+IBucketSize, p, q-p);
		ib.n = (q-p)/IEntrySize;
		n += ib.n;
		packibucket(&ib, buckdata, is->bucketmagic);
		if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
			fprint(2, "write %s: %r\n", part->name);
		lastb = b;
	}
	if(lastb+1 < is->stop-is->start && zero)
		zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));

	if(n != mb->nwentry)
		fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize);

	free(buckdata);
}

static void
isectproc(void *v)
{
	u32int buck, bufbuckets, bufsize, epbuf, i, j;
	u32int mbufbuckets, n, nbucket, nn, space;
	u32int nbuf, nminibuf, xminiclump, prod;
	u64int blocksize, offset, xclump;
	uchar *data, *p;
	Buf *buf;
	IEntry ie;
	IPool *ipool;
	ISect *is;
	Minibuf *mbuf, *mb;
	
	is = v;
	blocksize = is->blocksize;
	nbucket = is->stop - is->start;

	/*
	 * Three passes:
	 *	pass 1 - write index entries from arenas into 
	 *		large sequential sections on index disk.
	 *		requires nbuf * bufsize memory.
	 *
	 *	pass 2 - split each section into minibufs.
	 *		requires nminibuf * bufsize memory.
	 *
	 *	pass 3 - read each minibuf into memory and
	 *		write buckets out. 
	 *		requires entries/minibuf * IEntrySize memory.
	 * 
	 * The larger we set bufsize the less seeking hurts us.
	 * 
	 * The fewer sections and minibufs we have, the less
	 * seeking hurts us.
	 * 
	 * The fewer sections and minibufs we have, the 
	 * more entries we end up with in each minibuf
	 * at the end.  
	 *
	 * Shoot for using half our memory to hold each
	 * minibuf.  The chance of a random distribution 
	 * getting off by 2x is quite low.  
	 *
	 * Once that is decided, figure out the smallest 
	 * nminibuf and nsection/biggest bufsize we can use
	 * and still fit in the memory constraints.
	 */
	
	/* expected number of clump index entries we'll see */
	xclump = nbucket * (double)totalclumps/totalbuckets;
	
	/* number of clumps we want to see in a minibuf */
	xminiclump = isectmem/2/IEntrySize;
	
	/* total number of minibufs we need */
	prod = (xclump+xminiclump-1) / xminiclump;
	
	/* if possible, skip second pass */
	if(!dumb && prod*MinBufSize < isectmem){
		nbuf = prod;
		nminibuf = 1;
	}else{
		/* otherwise use nsection = sqrt(nmini) */
		for(nbuf=1; nbuf*nbuf<prod; nbuf++)
			;
		if(nbuf*MinBufSize > isectmem)
			sysfatal("not enough memory");
		nminibuf = nbuf;
	}
	/* size buffer to use extra memory */
	bufsize = MinBufSize;
	while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
		bufsize *= 2;
	data = emalloc(nbuf*bufsize);
	epbuf = bufsize/IEntrySize;
	fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
		is->part->name, nbucket, nbuf, nminibuf, bufsize);
	/*
	 * Accept index entries from arena procs.
	 */
	buf = MKNZ(Buf, nbuf);
	p = data;
	offset = is->blockbase;
	bufbuckets = (nbucket+nbuf-1)/nbuf;
	for(i=0; i<nbuf; i++){
		buf[i].part = is->part;
		buf[i].bp = p;
		buf[i].wp = p;
		p += bufsize;
		buf[i].ep = p;
		buf[i].boffset = offset;
		buf[i].woffset = offset;
		if(i < nbuf-1){
			offset += bufbuckets*blocksize;
			buf[i].eoffset = offset;
		}else{
			offset = is->blockbase + nbucket*blocksize;
			buf[i].eoffset = offset;
		}
	}
	assert(p == data+nbuf*bufsize);

	n = 0;
	while(recv(is->writechan, &ie) == 1){
		if(ie.ia.addr == 0)
			break;
		buck = score2bucket(is, ie.score);
		i = buck/bufbuckets;
		assert(i < nbuf);
		bwrite(&buf[i], &ie);
		n++;
	}
	add(&indexentries, n);
	
	nn = 0;
	for(i=0; i<nbuf; i++){
		bflush(&buf[i]);
		buf[i].bp = nil;
		buf[i].ep = nil;
		buf[i].wp = nil;
		nn += buf[i].nentry;
	}
	if(n != nn)
		fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
		
	free(data);

	fprint(2, "%T %s: reordering\n", is->part->name);
	
	/*
	 * Rearrange entries into minibuffers and then
	 * split each minibuffer into buckets.
	 * The minibuffer must be sized so that it is 
	 * a multiple of blocksize -- ipoolloadblock assumes
	 * that each minibuf starts aligned on a blocksize
	 * boundary.
	 */
	mbuf = MKN(Minibuf, nminibuf);
	mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
	while(mbufbuckets*blocksize % bufsize)
		mbufbuckets++;
	for(i=0; i<nbuf; i++){
		/*
		 * Set up descriptors.
		 */
		n = buf[i].nentry;
		nn = 0;
		offset = buf[i].boffset;
		memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
		for(j=0; j<nminibuf; j++){
			mb = &mbuf[j];
			mb->boffset = offset;
			offset += mbufbuckets*blocksize;
			if(offset > buf[i].eoffset)
				offset = buf[i].eoffset;
			mb->eoffset = offset;
			mb->roffset = mb->boffset;
			mb->woffset = mb->boffset;
			mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
			if(mb->nentry > buf[i].nentry)
				mb->nentry = buf[i].nentry;
			buf[i].nentry -= mb->nentry;
			nn += mb->nentry;
		}
		if(n != nn)
			fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
		/*
		 * Rearrange.
		 */
		if(!dumb && nminibuf == 1){
			mbuf[0].nwentry = mbuf[0].nentry;
			mbuf[0].woffset = buf[i].woffset;
		}else{
			ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
			ipool->buck0 = bufbuckets*i;
			for(j=0; j<nminibuf; j++){
				mb = &mbuf[j];
				while(mb->nentry > 0){
					if(ipool->nfree < epbuf){
						ipoolflush1(ipool);
						/* ipoolflush1 might change mb->nentry */	
						continue;
					}
					assert(ipool->nfree >= epbuf);
					ipoolloadblock(ipool, mb);
				}
			}
			ipoolflush(ipool);
			nn = 0;
			for(j=0; j<nminibuf; j++)
				nn += mbuf[j].nwentry;
			if(n != nn)
				fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
			free(ipool);
		}

		/*
		 * Make buckets.
		 */
		space = 0;
		for(j=0; j<nminibuf; j++)
			if(space < mbuf[j].woffset - mbuf[j].boffset)
				space = mbuf[j].woffset - mbuf[j].boffset;

		data = emalloc(space);
		for(j=0; j<nminibuf; j++){
			mb = &mbuf[j];
			sortminibuffer(is, mb, data, space, bufsize);
		}
		free(data);
	}
		
	sendp(isectdonechan, is);
}




Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to webmaster@9p.io.