Browse code

DAGTeam class for evaluation on a DAG

git-svn-id: file:///home/git/hedgehog.fhcrc.org/bioconductor/trunk/madman/Rpacks/Streamer@69054 bc3139a8-67e5-0310-9ffc-ced21a209358

Martin Morgan authored on 04/09/2012 21:08:15
Showing 8 changed files

... ...
@@ -1,12 +1,14 @@
1
-setGeneric("stream",
2
-    function(x, ..., verbose=FALSE) standardGeneric("stream"),
3
-    signature="x")
4
-
5 1
 setGeneric("reset", function(x, ...) standardGeneric("reset"))
6 2
 
7 3
 setGeneric("yield", function(x, ...) standardGeneric("yield"))
8 4
 
9 5
 setGeneric("status", function(x, ...) standardGeneric("status"))
10 6
 
7
+setGeneric("Stream",
8
+    function(x, ..., verbose=FALSE) standardGeneric("Stream"),
9
+    signature="x")
10
+
11
+setGeneric("DAGParam", function(x, ...) standardGeneric("DAGParam"))
12
+
11 13
 setGeneric("Team", function(FUN, ..., param) standardGeneric("Team"),
12 14
            signature = "param")
13 15
new file mode 100644
... ...
@@ -0,0 +1,13 @@
1
+.DAGParam <- setRefClass("DAGParam",
2
+    fields = list( dag = "graphNEL" ),
3
+    contains = "Streamer",
4
+    methods = list(
5
+      nodes = function() graph::nodes(dag),
6
+      edges = function() graph::edges(dag),
7
+      inEdges = function(node) graph::inEdges(dag)[[node]],
8
+      show = function() {
9
+          cat("class:", class(.self), "\n")
10
+          cat("DAG nodes:", numNodes(dag), "edges:", numEdges(dag),
11
+              "\n")
12
+          cat("verbose:", verbose, "\n")
13
+      }))
0 14
new file mode 100644
... ...
@@ -0,0 +1,31 @@
1
+setMethod(DAGParam, "missing",
2
+    function(x, ...)
3
+{
4
+    .DAGParam$new(...)
5
+})
6
+
7
+setMethod(DAGParam, "graphNEL",
8
+    function(x, ...)
9
+{
10
+    topology <- suppressWarnings(tsort(x))
11
+    if (nodes(x) > 1L && !length(topology))
12
+        stop("'x' is not a directed acyclic graph")
13
+    .DAGParam$new(dag=x, ...)
14
+})
15
+
16
+setMethod(DAGParam, "matrix",
17
+    function(x, W = NULL, V = NULL, ...)
18
+{
19
+    dag <- ftM2graphNEL(x, W=W, V=V, edgemode="directed")
20
+    DAGParam(dag, ...)
21
+})
22
+
23
+setMethod(DAGParam, "data.frame",
24
+    function(x, ...)
25
+{
26
+    nms <- c("From", "To", "W", "V")
27
+    if (!all(c("From", "To") %in% names(x)))
28
+        stop("'x' must have columns 'From', 'To'")
29
+    ft <- cbind(x$From, x$To)
30
+    DAGParam(ft, W=x$W, V=x$V, ...)
31
+})
0 32
new file mode 100644
... ...
@@ -0,0 +1,41 @@
1
+.DAGTeam <- setRefClass("DAGTeam",
2
+    fields = list(
3
+      tbuf = "list",                    # results, in topological order
4
+      dagParam = "DAGParam",
5
+      teamParam = "ParallelParam"),
6
+    contains = "Consumer",
7
+    methods = list(
8
+      reset = function() {
9
+          callSuper()
10
+          for (elt in tbuf)
11
+              elt$reset()
12
+      },
13
+      yield = function() {
14
+          values <- setNames(vector("list", length(tbuf)), names(tbuf))
15
+          ## input value
16
+          args <- callSuper()
17
+          values[[1]] <- do.call(tbuf[[1]]$FUN, list(args))
18
+          ## subsequent values
19
+          for (nm in names(values)[-1]) {
20
+              args <-values[dagParam$inEdges(nm)]
21
+              values[[nm]] <- do.call(tbuf[[nm]]$FUN, args)
22
+          }
23
+          values[[length(values)]]
24
+      }))
25
+
26
+DAGTeam <-
27
+    function(..., dagParam=DAGParam(), teamParam=MulticoreParam(1L))
28
+{
29
+    consumers <- list(...)
30
+    if (!setequal(names(consumers), dagParam$nodes()))
31
+        stop("'names(...)' and 'dagParam' nodes not equal")
32
+    ok <- sapply(consumers, is, "FunctionConsumer")
33
+    if (!all(ok))
34
+        stop("'...' must be 'FunctionConsumer' instances, failing: '",
35
+             paste(consumers[!ok], collapse="', '"), "'")
36
+
37
+    ## re-order consumers into topological order
38
+    tbuf <- consumers[tsort(dagParam$dag)]
39
+
40
+    .DAGTeam$new(tbuf = tbuf, dagParam=dagParam, teamParam=teamParam)
41
+}
0 42
new file mode 100644
... ...
@@ -0,0 +1,12 @@
1
+plot.DAGParam <-
2
+    function(x, y, ...)
3
+{
4
+    require(Rgraphviz)
5
+    plot(x$dag, ...)
6
+}
7
+
8
+plot.DAGTeam <-
9
+    function(x, y, ...)
10
+{
11
+    plot(x$dagParam, ...)
12
+}
0 13
new file mode 100644
... ...
@@ -0,0 +1,19 @@
1
+test_DAGTeam_serial <- function()
2
+{
3
+    df <- data.frame(From = c("A", "A", "B", "C"),
4
+                     To   = c("B", "C", "D", "D"),
5
+                     stringsAsFactors=FALSE)
6
+    dagParam <- DAGParam(df)
7
+    dteam <- DAGTeam(A=FunctionConsumer(function(y) y),
8
+                     B=FunctionConsumer(function(A) -A),
9
+                     C=FunctionConsumer(function(A) 1 / A),
10
+                     D=FunctionConsumer(function(B, C) B + C),
11
+                     dagParam=dagParam)
12
+    strm <- Stream(Seq(to=10), dteam)
13
+    
14
+    a <- 1:10
15
+    checkIdentical(-a + 1 / a, sapply(strm, c))
16
+    checkIdentical(numeric(0), yield(strm))
17
+    reset(strm)
18
+    checkIdentical(-a + 1 / a, sapply(strm, c))
19
+}
0 20
new file mode 100644
... ...
@@ -0,0 +1,112 @@
1
+\name{DAGTeam-class}
2
+\Rdversion{1.1}
3
+\docType{class}
4
+\alias{DAGTeam-class}
5
+\alias{DAGParam-class}
6
+% *Team-methods
7
+\alias{DAGTeam}
8
+\alias{DAGParam}
9
+\alias{DAGParam,missing-method}
10
+\alias{DAGParam,graphNEL-method}
11
+\alias{DAGParam,matrix-method}
12
+\alias{DAGParam,data.frame-method}
13
+\alias{plot.DAGParam}
14
+\alias{plot.DAGTeam}
15
+
16
+\title{Consumer classes for directed acyclic graph evaluation}
17
+
18
+\description{
19
+
20
+  A \code{\linkS4class{Consumer}} to route incoming tasks through nodes
21
+  connected as a directed acyclic graph.
22
+
23
+}
24
+
25
+\usage{
26
+DAGParam(x, ...)
27
+
28
+DAGTeam(..., dagParam = DAGParam(), teamParam = MulticoreParam(1L))
29
+
30
+\S3method{plot}{DAGTeam}(x, y, ...)
31
+}
32
+
33
+\arguments{
34
+
35
+  \item{x}{A matrix or data.frame with columns \sQuote{From},
36
+    \sQuote{To}, or a \code{graphNEL} object (from the graph package)
37
+    describing a directed acyclic graph.}
38
+
39
+  \item{...}{For \code{DAGTeam}, named \code{\link{FunctionConsumer}}
40
+    instances, one for each node in the graph. The
41
+    \code{FunctionConsumer} corresponding to the first node in the graph
42
+    must accept one argument; remaining \code{FunctionConsumer}
43
+    instances must have as input arguments the names of the nodes from
44
+    which the inputs derive, as in the example below.
45
+
46
+    For \code{DAGParam} when \code{x} is a data.frame or matrix,
47
+    data.frame columns \code{W}, \code{V} or additional arguments
48
+    \code{W}, \code{V} as described in \code{\link{ftM2graphNEL}}.
49
+
50
+  }
51
+
52
+  \item{dagParam}{A \code{DAGParam} instance, with all nodes referenced
53
+    in the graph represented by \code{\link{FunctionConsumer}} instances
54
+    in \code{...}.}
55
+
56
+  \item{teamParam}{A \code{ParallelParam} instance, such as generated by
57
+    \code{MulticoreParam()}. Currently ignored (all calculations are
58
+    performed on a single thread).}
59
+
60
+  \item{y}{Unused.}
61
+
62
+}
63
+
64
+\section{Constructors}{
65
+
66
+  Use \code{DAGParam} and \code{DAGTeam} to construct instances of these
67
+  classes, with \code{ParallelParam} instances created by, e.g.,
68
+  \code{MulticoreParam}.
69
+
70
+}
71
+
72
+\section{Methods}{See \code{\link{Consumer}} Methods.}
73
+
74
+\section{Internal Class Fields and Methods}{
75
+
76
+  Internal fields of this class are are described with, e.g.,
77
+  \code{getRefClass("MulticoreTeam")$fields}.
78
+
79
+  Internal methods of this class are described with
80
+  \code{getRefClass("MulticoreTeam")$methods()} and
81
+  \code{getRefClass("MulticoreTeam")$help()}.
82
+
83
+}
84
+
85
+\author{Martin Morgan \url{mtmorgan@fhcrc.org}}
86
+
87
+\seealso{
88
+
89
+  \code{\link{Team}} applies a single function across multiple threads..
90
+
91
+}
92
+
93
+\examples{
94
+df <- data.frame(From = c("A", "A", "B", "C"),
95
+                 To   = c("B", "C", "D", "D"),
96
+                 stringsAsFactors=FALSE)
97
+dagParam <- DAGParam(df)
98
+dteam <- DAGTeam(A=FunctionConsumer(function(y) y),
99
+                 B=FunctionConsumer(function(A) -A),
100
+                 C=FunctionConsumer(function(A) 1 / A),
101
+                 D=FunctionConsumer(function(B, C) B + C),
102
+                 dagParam=dagParam)
103
+
104
+plot(dteam)
105
+
106
+strm <- Stream(Seq(to=10), dteam)
107
+sapply(strm, c)
108
+reset(strm)
109
+
110
+}
111
+
112
+\keyword{classes}
... ...
@@ -70,8 +70,8 @@ register(param)
70 70
 \seealso{
71 71
 
72 72
   \code{\link{Team}} to apply one function in parallel,
73
-  \code{\link{TConnector}} to apply different functions to all elements
74
-  of a stream.
73
+  \code{\link{DAGTeam}} to evaluate functions whose dependencies are
74
+  represented as directed acyclic graphs.
75 75
 
76 76
 }
77 77