setClass("DataManager",
representation(
"thread",
data="threadLock",
threads="threadLock",
notify="threadLock"
)
)
expression and thread.
The idea is that each element of expression is
evaluated by the thread in the corresponding element of
thread when the relevant object is added to the
data table. This is a form of broadcasting to
interested
parties rather than having them poll the existence of an object.
DataManager <- function(what,
notify,
start = T,
priority=Thread.minPriority, attributes=threadAttributes())
{
# create a DataManager object - this takes care of the necessary
# fields such as the threadLocks used for the pending and data table.
dmgr = new("DataManager")
# create the datamanager thread that handles the initialization
# of the objects in the table. This differs from the Java
# implementation.
if(!is.null(notify)) {
if(is.null(names(notify)) && length(notify) == length(what))
names(notify) = names(what)
for(i in names(notify)) {
# this is rather crude, but not the point of this example!
if(match("expression",names(notify[[i]])))
names(notify[[i]])[[1]] = "expression"
if(match("thread",names(notify[[i]])))
notify[[i]][["thread"]] = self()
assign(i, list(expression=notify[[i]], thread=self()), where=dmgr@notify)
}
}
dmgr@thread = thread(Quote(loadObjects()),
start=start,
data=list(init.objects=what,
notify=notify, priority=priority))
return(dmgr)
}
The function locks both the thread and data slots of the manager and creates the threads. Note the order in which the threads are lock and unlocked. The acquisition and release are reversed to ensure the proper nesting and avoid potential deadlock and race condition problems.
loadObjects <- function(where=self()) {
succeeded = 0
if(exists("init.objects", frame=0)) {
getLock(data)
getLock(threads)
for(i in names(init.objects)) {
# default.priority is in the thread's frame 0
# we ignore the where argument for addDataSet
# as the default is self() and we are calling this
# from within the DataManager thread.
data.set.thread = addDataSet(init.objects[[i]], default.priority,where=where)
if(!is.null(data.set.thread))
assign(i, data.set.thread, where=threads)
}
succeeded = length(threads)
yieldLock(threads)
yieldLock(data)
}
}
updateThreads.dataManager <- function(what,manager, stop=T) {
tmp.thread = NULL
getLock(manager@threads)
# I hold the lock so get, exists and remove must still be able to work
if(exists(what,where=manager@threads)) {
if(stop)
tmp.thread = get(what,where=manager@threads)
remove(what,where=manager@threads)
}
yieldLock(manager@threads)
if(stop && !is.null(tmp.thread)) {
# the thread could finish in between evaluating the condition and
# calling cancel. But the Evaluator Manager should take care of
# this when evaluating the internals of the call to cancel.
cancel(tmp.thread)
}
getLock(manager@threads)
return(length(objects(where=manager@threads))) # return the number of
# threads pending
yieldLock(manager@threads)
}
updateThreads.collector <- function()
{
getLock(threads)
for(i in objects(where=threads)) {
val = get(i,where=threads)
if(is.null(i) || !isAlive(i))
remove(i, where=threads)
}
yieldLock(threads)
}
addDataSet <- function(obj, name, manager)
{
assign(name, data,where=manager@threads)
}
addDataSet <- function(obj, name, manager=NULL, priority = manager.priority)
{
# take the expression and modify it so that we can
# assign the object into the thread
getLock(manager.threads)
tmp.thread = thread(obj, priority=priority, data=list(manager = manager))
if(!is.null(tmp.thread))
assign(name, tmp.thread, where=manager.threads)
yieldLock(manager.threads)
}
manager@data
object provided by exists()
manages <- function(name, manager) {
return(exists(name, where=manager@data))
}
Note that here we use only the implicit locking of the database in the call to exists(). Locking is used explicitly in updateThreads().
isPending <- function(name, manager)
{
updateThreads() # do some garbage collection first
return(exists(manager@threads))
}
getDataSet <- function(name, manager)
{
ans = null
getLock(manager@data)
if(exists(name,where=manager@data))
ans = get(name, where=manager@data)
yieldLock(manager@data)
return(ans)
}
Note that we use a lock-condition where we could also use a call to join(). The latter approach may be better since we don't have to evaluate the condition each time a thread modifies the pending list. We use the lock-condition more as an example.
If thread A knows that a manager thread has already spawned the constructor thread for an object named "obj", it could monitor the manager's data threadLock object directly and block its own execution until that object was in the table. For example,
addDataSet("obj", manager=dmgr, how=Quote(createObject("obj")))
..
..
getLock(dmgr@data, condition = Quote(exists("obj", where=dmgr@data))
# now it is there and we hold the lock
obj = get("obj", where=dmgr@data)
yieldLock(dmgr@data)
Note that this thread exploits knowledge of the structure of a
DataManager object rather than using
methods to get access to the slots.
In which thread are the conditions in a getLock call
evaluated? What is the name space?
I believe that the expression should be evaluated in the
current state of the thread which calls getLock
. The evaluator manager takes care of evaluating
the expression in that context.
getDataSet <- function(name, manager, how=NULL, priority = manager@priority)
{
obj = getDataSet(name, manager)
if(!is.null(obj))
return(obj)
# so not in the table yet.
# so look in the list of pending threads
pending = F
getLock(manager@threads)
if(exists(name, where=manager@threads)) {
pending = T
if(!missing(priority)) {
tmp.thread = get(name,where=manager@threads)
setPriority(tmp.thread, priority)
}
}
yieldLock(manager@threads)
if(pending) {
# could join on this thread or we can wait for the
# pending thread list to be changed and this thread to
# be removed. Joining is probably better since we don't
# have to evaluate the condition each time a thread modifies
# the pending list. However, if we join, we must do so
# outside of the lock or no other thread can modify the threads list
getLock(manager@threads,condition= Quote(exists(name,where=manager@threads)))
return(getDataSet(name,manager))
}
# so not pending and not in the table, so make it.
getLock(manager@threads)
tmp.thread = addDataSet(how,name, manager)
assign(name, tmp.thread, where=manager@threads)
yieldLock(manager@threads)
join(tmp.thread) # wait for the thread to finish.
sendTask(Quote(updateThreads(), thread=manager@thread)
return(getDataSet(name,manager))
}
removeDataSet <- function(name, manager)
{
ans = T
# handle the case in which there is a thread already
# running
if(isPending(name, manager)) {
ans =updateThreads(name, manager,stop=T)) # have to make updateThreads reutrn logical
# indicating success or failure
}
# now check the table
getLock(manager@data)
remove(name, where=manager@data)
yieldLock(manager@data)
return(ans)
}
A lock is obtained to ensure other threads don't modify the table. Again, we don't want the locks around the database function as these functions take care of locking themselves and making their operations atomic!
length <- function(manager)
{
getLock(manager@data)
n = length(objects(where=manager@data))
yieldLock(manager@data)
return(n)
}
Again, we don't want the locks around the database function as these functions take care of locking themselves and making their operations atomic!
names <- function(manager)
{
getLock(manager@thread)
nms = objects(where=manager@data)
yieldLock(manager@thread)
}
status <- function(manager) {
getLock(manager@data)
getLock(manager@threads)
ans = list(num.objects=length(manager@data), num.threads=length(manager@threads))
yieldLock(manager@threads)
yieldLock(manager@data)
return(ans)
}
Note that the notification performed in this function will come from the thread performing the construction rather than the Manager itself. (Next we need methods for faking messages!;-))
insertDataSet <- function(obj, name, manager)
{
# don't want these locks
# any thread waiting on a condition in this threadLock object (manager@data)
# will have the condition re-evaluated when the lock is yielded
getLock(manager@data)
assign(name, obj, where=manager@data)
yieldLock(manager@data)
# ensure that this thread is removed from list of pending threads
updateThreads(name, F)
# now make sure the asyncrhonous notifications are handled
getLock(manager@notify)
if(match(name, names(manager@notify)) {
destination.threads = manager@notify[[name]]
for(i in destination.threads) {
# how do we put the type of action in - e.g. ADDED_OBJECT, REMOVE_OBJECT
sendTask(i$expression, i$thread, wait=F)
}
}
threadLock(manager@notify)
invisible(return(T)) # not needed because this function is to be
# called as the last action of a background thread
}
kill() method is not needed in the S model as we
can send a cancel to that thread quite easily.
We can also use that approach in the
Java implementation.
However, we can do it more gracefully
by sending a
request to the manager's thread to have it terminate itself
with the relevant exit handlers and user supplied expressions invoked.
(Java of course has the final construct.)
run methodsendTask().
However, this signal has complete semantic information
in that it can be an expression. (Security issues here
and hence we have access! Think CGI scripts and Perl's eval
therein.)
A window is displayed which contains 2 buttons,
an empty list and a text widget.
(In the adjacent figure, we also have a status completion
bar, but we ignore that here.)
The two buttons allow the user to quit the browser
and to determine the status of the manager and provide
a display of the number of objects still being
constructed/pending.
The list is used to display the names of the objects of interest.
As these objects are constructed and become available to the browser,
the names are added to the list.
Double clicking on an entry in the list displays the
objects contents in the text area.
So this behaves like an interactive
print in S.
The sample view shows the contents of the file
doc2.html in the text area.
We gloss over the details of how the window and widgets
are created. (See GUI development
for these details.)
We assume this is done via a call to the function
make.gui().
So our main thread, A say, might look like the following
1 base = "file:///home/duncan/RESEARCH/GUI/Examples/Java/Data"
2
3 docs = paste(0:4,".html",sep="")
4 data = as.character(0:4)
5
6 docs = paste(paste(base,"doc",sep=""),docs,sep="/")
7 data = paste(paste(base,"data",sep=""),data,sep="/")
8
9 how = vector("list",length(docs)+length(data))
10 ctr = 0
11 for(i in docs) {
12 how[[ctr]] = substitute(readDoc(d),list(d=i))
13 ctr = ctr+1
14 }
15
16 for(i in data) {
17 how[[ctr]] = substitute(readData(d),list(d=i))
18 ctr = ctr+1
19 }
20
21 notify = vector("list",length(how))
22 names(notify) = names(who)
23
24 for(i in names(notify))
25 notify[[i]] = list(expression=
26 substitute(addItem(xlist,what),list(what=i)), thread = self())
27
28 # now create the data manager.
29 dmgr = DataManager(how,notify)
30
31
32 guiTree = make.gui
33 xlist = getList(guiTree)
34
35
36 show(guiTree)
37
38 thread sleeps waiting for tasks from
39 the event loop processing user events
Lines 1 through 26 create the relevant objects
for the Data Manager. Other covenience routines could be created that
would hide the details of this that would use knowledge of the context
and objects being created.
The important things to note are
readDoc
and readData to read in documents and data sets
respectively. We could also create images, other widget
hierarchies, etc.
notify object is created so that thread A is
sent the expression
addItem(list, name)
where name is the character string representing the
object just created. This expression is evaluated in thread A
and the result is that the name is added to the list and
available to the user.
If we want more than one thread to be notified, we could
have made the thread field in the
notify element a list and enumerated the target
threads.
The usual S facilities would be used to ensure that the length
of the expression and thread lists
matched by replicating elements as needed.
32 guiTree = make.gui 33 xlist = getList(guiTree) 34 35 thread(substitute(updateList(manager),list(manager=dmgr)),data=list(xlist=xlist)) 36 show(guiTree)The function
updateList() looks something like the
following:
updateList <- function(manager) {
# first, insert all the elements already in the data table
# that were created before this thread was started.
getLock(manager@data)
items = names(manager)
for(i in items)
addItem(xlist)
yieldLock(manager@data)
while(T) {
# always be notified if anything happens
getLock(manager@data, condition=T)
# now pick out the new threads
new.items = new.items[match(names(manager),items,nomatch=0)==0]
for(i in new.items)
addItem(xlist)
}
invisible(return(NULL))
}
One might think that we could just track changes in the length of the data table. However, one thread could add a data set to the table and a second thread remove a different one. We will be notified of both but we may not obtain the lock when we are woken up. Instead, we are competing for the lock and have just registered for the competition rather than guaranteed winning it. So, it is possible that the length would be the same when we get to evaluate the expression, but the contents would differ. Such effects are common in threaded applications that never arise in single-threaded sequential code. In general, one has to consider all possible thread interactions to validate an algorithm. This is what makes developing threaded code complicated and time consuming.
DataManager
class is a threadGroup
object.
Should use ThreadGroup for the thread slot.