Browse code

update scan, readLines, and read.table producers

- ReadLinesInput, ScanInput removed


git-svn-id: file:///home/git/hedgehog.fhcrc.org/bioconductor/trunk/madman/Rpacks/Streamer@68687 bc3139a8-67e5-0310-9ffc-ced21a209358

Martin Morgan authored on 21/08/2012 20:30:18
Showing 7 changed files

... ...
@@ -1,7 +1,7 @@
1 1
 Package: Streamer
2 2
 Type: Package
3 3
 Title: Enabling stream processing of large files
4
-Version: 1.3.3
4
+Version: 1.3.4
5 5
 Date: 2010-10-13
6 6
 Author: Martin Morgan, Nishant Gopalakrishnan
7 7
 Maintainer: Martin Morgan <mtmorgan@fhcrc.org>
... ...
@@ -21,7 +21,7 @@ biocViews: Infrastructure, DataImport
21 21
 Collate: 
22 22
   generics.R OldClass.R
23 23
   Streamer.R Producer.R  Consumer.R Stream.R
24
-  ConnectionProducer.R RawInput.R ValueInput.R Seq.R
24
+  ConnectionProducer.R RawInput.R Seq.R
25 25
   Downsample.R Team.R UserFunction-class.R Utility.R
26 26
   ParallelConnector.R TConnector.R YConnector.R
27 27
   zzz.R
... ...
@@ -1,33 +1,130 @@
1 1
 .ConnectionProducer <- setRefClass("ConnectionProducer",
2 2
     contains = "Producer",
3 3
     fields = list(
4
-        con = "connection",
5
-        reader = "function", parser = "function"
6
-    ))
4
+      con = "connection", conArgs = "list",
5
+      reader = "function", readerArgs = "list"))
6
+
7 7
 .ConnectionProducer$methods(
8
-    initialize = function(con,
9
-        reader = rawReaderFactory(),
10
-        parser = rawParserFactory(), ...)
8
+    initialize = function(
9
+      con, conArgs = list(),
10
+      reader = scan, readerArgs = list(), ...)
11 11
     {
12 12
         "initialize ConnectionProducer"
13
-        callSuper(..., reader=reader, parser=parser)
14
-        if (!missing(con))
13
+        callSuper(..., conArgs = conArgs, reader=reader,
14
+                  readerArgs=readerArgs)
15
+        if (!missing(con)) {
16
+            ## do not use con = file() in contructor, otherwise
17
+            ## defining a derived class opens an unused connection
15 18
             .self$con <- con
19
+            if (!isOpen(con))
20
+                reset()
21
+        }
16 22
         .self
17 23
     },
18 24
     reset = function()
19 25
     {
20 26
         "reset ConnectionProducer: reopen connection"
27
+        if (verbose)
28
+            msg("ConnectionProducer$reset")
21 29
         callSuper()
22 30
         if (is(con, "connection") && isOpen(con)) {
23
-            if (verbose) msg("ConnectionProducer$reset re-open")
24 31
             s <- summary(con)
25
-            class <- s$class
26
-            desc <- s$description
32
+            cls <- s$class
27 33
             close(con)
28
-            .self$con <- do.call(s$class, list(desc, "rb"))
34
+            args <- conArgs
35
+            args[c("description", "open")] <-
36
+                s[c("description", "mode")]
37
+            .self$con <- do.call(cls, args)
29 38
         } else {
30
-            open(con, "rb")
39
+            do.call(base::open, c(list(con), conArgs))
31 40
         }
32 41
         .self
42
+    },
43
+    yield = function()
44
+    {
45
+        "yield ConnectionProducer: read data from an open connection"
46
+        if (verbose)
47
+            msg("ConnectionProducer$reset")
48
+        do.call(reader, c(list(con), readerArgs))
33 49
     })
50
+
51
+close.ConnectionProducer <-
52
+    function(con, ...)
53
+{
54
+    if (isOpen(con$con))
55
+        close(con$con)
56
+}
57
+
58
+## 
59
+## Scan/ReadLines/ReadTableProducer
60
+## 
61
+
62
+
63
+.connectionProducer <-
64
+    function(generator, con, reader, conArgs, readerArgs, dotArgs)
65
+{
66
+    args <- list(con=con, conArgs=conArgs, reader=reader,
67
+                 readerArgs=readerArgs)
68
+    args[names(dotArgs)] <- dotArgs
69
+    do.call(generator$new, args[!sapply(args, is.null)])
70
+    
71
+}
72
+
73
+.ScanProducer <- setRefClass("ScanProducer",
74
+    contains="ConnectionProducer")
75
+
76
+ScanProducer <-
77
+    function(file, ..., fileArgs=list(), scanArgs=list(...))
78
+{
79
+    dotArgs <- NULL
80
+    if (!missing(scanArgs))
81
+        dotArgs <- list(...)
82
+    con <- if (is.character(file)) file(file) else file
83
+    .connectionProducer(.ScanProducer, con, base::scan, fileArgs,
84
+                        scanArgs, dotArgs)
85
+}
86
+
87
+.ReadLinesProducer <- setRefClass("ReadLinesProducer",
88
+    contains = "ConnectionProducer")
89
+
90
+ReadLinesProducer <-
91
+    function(con, ..., conArgs=list(), readLinesArgs=list(...))
92
+{
93
+    dotArgs <- NULL
94
+    if (!missing(readLinesArgs))
95
+        dotArgs <- list(...)
96
+    if (is.character(con))
97
+        con <- file(con)
98
+    .connectionProducer(.ReadLinesProducer, con, base::readLines,
99
+                        conArgs, readLinesArgs, dotArgs)
100
+}
101
+
102
+.ReadTableProducer <- setRefClass("ReadTableProducer",
103
+    fields = list(
104
+      .template = "data.frame"),
105
+    contains = "ConnectionProducer",
106
+    methods = list(
107
+      .yield_error = function(err) {
108
+          if (conditionMessage(err) == "no lines available in input" &&
109
+              ncol(.template) != 0L)
110
+          {
111
+              .template
112
+          } else stop(err)
113
+      },
114
+      yield = function() {
115
+          y <- tryCatch(callSuper(), error=.self$.yield_error)
116
+          if (ncol(.template) == 0L)
117
+              .self$.template <- y[FALSE,,drop=FALSE]
118
+          y
119
+      }))
120
+
121
+ReadTableProducer <-
122
+    function(file, ..., fileArgs=list(), readTableArgs=list(...))
123
+{
124
+    dotArgs <- NULL
125
+    if (!missing(readTableArgs))
126
+        dotArgs <- list(...)
127
+    con <- if (is.character(file)) file(file) else file
128
+    .connectionProducer(.ReadTableProducer, con, utils::read.table,
129
+                        fileArgs, readTableArgs, dotArgs)
130
+}
... ...
@@ -36,7 +36,7 @@ rawParserFactory <-
36 36
 .RawInput <- setRefClass("RawInput",
37 37
     contains="ConnectionProducer",
38 38
     fields = list(
39
-      yieldSize = "integer",
39
+      parser = "function", yieldSize = "integer",
40 40
       .buffer = "raw", .records = "list", .parsedRecords = "integer"
41 41
     ))
42 42
 
43 43
deleted file mode 100644
... ...
@@ -1,101 +0,0 @@
1
-readLinesReaderFactory <-
2
-    function(blockSize=1e6, ...)
3
-{
4
-    if ("n" %in% names(list(...)))
5
-        stop("use 'blockSize' instead of 'n' to specify block size")
6
-    function(con, blockSize)
7
-    {
8
-        readLines(con, blockSize, ...)
9
-    }
10
-}
11
-
12
-
13
-## ScanInput
14
-
15
-scanReaderFactory <-
16
-    function(blockSize=1e6, ...)
17
-{
18
-    if ("nmax" %in% names(list(...)))
19
-        stop("use 'blockSize' instead of 'nmax' to specify block size")
20
-    function(con)
21
-    {
22
-        scan(con, nmax=blockSize, ...)
23
-    }
24
-}
25
-
26
-concatenationParserFactory <-
27
-    function()
28
-{
29
-    function(buf, bin)
30
-    {
31
-        c(buf, bin)
32
-    }
33
-}
34
-
35
-readLinesParserFactory <- concatenationParserFactory
36
-
37
-scanParserFactory <- concatenationParserFactory
38
-
39
-## ReadLinesInput
40
-
41
-.ReadLinesInput <- setRefClass("ReadLinesInput",
42
-    contains = "ConnectionProducer",
43
-    fields = list(
44
-      yieldSize = "integer",
45
-      .records = "character"))
46
-
47
-.ReadLinesInput$methods(
48
-    reset = function()
49
-    {
50
-        "reset ReadLinesInput"
51
-        if (verbose) msg("ReadLinesInput$reset()")
52
-        callSuper()
53
-        .self$.records <- character()
54
-        .self$.parsedRecords <- 0L
55
-        .self
56
-    },
57
-    .add=function(input)
58
-    {
59
-        .self$.records <- .self$parser(.self$.records, input)
60
-        .self
61
-    },
62
-    .fill=function()
63
-    {
64
-        while(length(.records) < yieldSize && 
65
-              0 !=length(input <- .self$reader(con, yieldSize)))
66
-            .add(input)
67
-        .self
68
-    },
69
-    yield = function()
70
-    {
71
-        "current stream, with flush if yieldSize not satisfied"
72
-        if (verbose) msg("ReadLinesInput$yield()")
73
-        .fill()
74
-        idx <- seq_len(min(yieldSize, length(.self$.records)))
75
-        records <- .records[idx]
76
-        .self$.records <- .self$.records[-idx]
77
-        records
78
-    },
79
-    status = function()
80
-    {
81
-        "report status of ReadLinesInput"
82
-        if (verbose) msg("ReadLinesInput$status()")
83
-        c(list(.recordLength = length(.records)),
84
-          callSuper())
85
-    }
86
-)
87
-
88
-ReadLinesInput <- 
89
-    function(con, reader=readLinesReaderFactory(),
90
-             parser=readLinesParserFactory(), 
91
-             yieldSize=1e6, ...)
92
-{
93
-    if (!is(con, "connection"))
94
-        con <- file(con, "r")
95
-    yieldSize <- as.integer(yieldSize)
96
-    .ReadLinesInput$new(con=con, reader=reader, parser=parser,
97
-                        yieldSize=yieldSize, ...)
98
-}
99
-
100
-
101
-
102 0
new file mode 100644
... ...
@@ -0,0 +1,34 @@
1
+test_ConnectionProducer <-
2
+    function()
3
+{
4
+    fl <- system.file(package="Rsamtools", "extdata", "ex1.sam")
5
+
6
+    p <- ScanProducer(file(fl, "r"), what="character", quiet=TRUE)
7
+    checkIdentical(51431L, length(yield(p)))
8
+    checkIdentical(character(0), yield(p))
9
+    close(p)
10
+
11
+    p <- ReadLinesProducer(file(fl, "r"), n = 1000)
12
+    obs <- integer()
13
+    while (length(y <- yield(p)))
14
+        obs <- append(obs, length(y))
15
+    exp <- as.integer(c(1000, 1000, 1000, 307))
16
+    checkIdentical(exp, obs)
17
+    close(p)
18
+
19
+    p <- ReadTableProducer(file(fl, "r"), quote="", fill=TRUE, nrows=1000)
20
+    obs <- integer()
21
+    while (nrow(y <- yield(p)))
22
+        obs <- append(obs, nrow(y))
23
+    exp <- as.integer(c(1000, 1000, 1000, 307))
24
+    checkIdentical(exp, obs)
25
+    checkIdentical(c(0L, 17L), dim(yield(p)))
26
+    close(p)
27
+
28
+    ## reset
29
+    p <- ReadTableProducer(file(fl, "r"), quote="", fill=TRUE, nrows=1000)
30
+    exp <- yield(p)
31
+    reset(p)
32
+    checkIdentical(exp, yield(p))
33
+    close(p)
34
+}
... ...
@@ -2,49 +2,56 @@
2 2
 \Rdversion{1.1}
3 3
 \docType{class}
4 4
 \alias{ConnectionProducer-class}
5
+\alias{close.ConnectionProducer}
6
+\alias{ScanProducer-class}
7
+\alias{ReadLinesProducer-class}
8
+\alias{ReadTableProducer-class}
9
+\alias{ScanProducer}
10
+\alias{ReadLinesProducer}
11
+\alias{ReadTableProducer}
5 12
 
6 13
 \title{Class "ConnectionProducer"}
7 14
 
8 15
 \description{
9 16
 
10
-  A virtual class containing components that are required to create a
11
-  custom \code{Producer}-class to read data from file connections. Users
12
-  can inherit from the \code{ConnectionProducer}-class to create their
13
-  own \code{Producer} classes that interact with files. Users are
14
-  expected to pass in appropriate \code{reader} and \code{parser}
15
-  functions for files when creating instances of classes that inherit
16
-  from \code{ConnectionProducer}-class.
17
+  \code{ConnectionProducer} classes include \code{ScanProducer},
18
+  \code{ReadLinesProducer}, and \code{ReadTableProducer}, providing
19
+  Streamer interfaces to \code{scan}, \code{readLines}, and
20
+  \code{read.table}.
17 21
 
18 22
 }
19 23
 
20
-\section{Fields}{
21
-  \describe{
24
+\usage{
25
+ScanProducer(file, ..., fileArgs=list(), scanArgs=list(...))
26
+ReadLinesProducer(con, ..., conArgs=list(), readLinesArgs=list(...))
27
+ReadTableProducer(file, ..., fileArgs=list(), readTableArgs=list(...))
22 28
 
23
-    The \code{ConnectionProducer} class inherits the fields
24
-    \code{verbose} and \code{inUse} fields from the \code{Streamer}
25
-    class. Please refer to the \code{\link{Streamer}} class for more
26
-    details.
29
+\S3method{close}{ConnectionProducer}(con, ...)
30
+}
31
+
32
+\arguments{
33
+
34
+  \item{file, con}{The file or connection to be used for input. See
35
+    \code{\link{connections}}.}
36
+
37
+  \item{...}{Additional arguments, e.g., \code{nlines}, to \code{scan},
38
+    \code{readLines}, etc.}
39
+
40
+  \item{fileArgs, conArgs}{Arguments, e.g., \code{mode},
41
+    \code{encoding}, to be used when invoking \code{\link{reset}()}.}
42
+
43
+  \item{scanArgs, readLinesArgs, readTableArgs}{Arguments to
44
+    \code{scan}, \code{readLines}, etc., when reading a file or
45
+    connection; provide this argument when \code{...} contains arguments
46
+    (especially \code{verbose=TRUE}) to be used by the class.}
27 47
 
28
-    \item{\code{con}:}{An object of class \code{connection}.}
29
-    \item{\code{reader}:}{A function that reads data from a file
30
-        connection}
31
-    \item{\code{parser}:}{A function that parses  data to records.}
32
-  }
33 48
 }
34 49
 
35
-\section{Class-Based Methods}{
36
-  \describe{
50
+\section{Internal Methods}{
37 51
 
38
-    The \code{ConnectionProducer} class inherits the methods
39
-    \code{initialize}, \code{msg}, \code{reset}, \code{status} and
40
-    \code{yield} from the \code{Streamer} virtual class. Please refer to
41
-    the \code{\link{Streamer}} class for more details.
42
-  
43
-    Derived classes should implement an appropriate \code{yield} method
44
-    to return the contents of the current stream. The default method for
45
-    the base virtual \code{Streamer} class returns a \code{list()}
52
+  Class-based fields and methods are for internal use. See, e.g.,
53
+  \code{Streamer:::.ScanLinesProducer$help()} for documentation
46 54
 
47
-  }
48 55
 }
49 56
 
50 57
 \author{Martin Morgan \url{mtmorgan@fhcrc.org}}
... ...
@@ -54,6 +61,29 @@
54 61
   \code{\linkS4class{Streamer}-class}.
55 62
 }
56 63
 
57
-\examples{showClass("ConnectionProducer")}
64
+\examples{
65
+fl <- system.file(package="Rsamtools", "extdata", "ex1.sam")
66
+
67
+p <- ReadLinesProducer(fl, n = 1000)  # read 1000 lines at a time
68
+while (length(y <- yield(p)))
69
+    print(length(y))
70
+
71
+p <- ReadTableProducer(fl, quote="", fill=TRUE, nrows=1000)
72
+while (nrow(y <- yield(p)))
73
+    print(dim(y))
74
+
75
+reset(p)
76
+dim(yield(p))
77
+
78
+## connections opened 'under the hood' are closed, with warnings
79
+rm(p); gc() 
80
+
81
+## avoid warnings by managing connections
82
+p <- ScanProducer(file(fl, "r"), verbose=TRUE,
83
+                  scanArgs=list(what=character()))
84
+length(yield(p))
85
+close(p)
86
+rm(p); gc()
87
+}
58 88
 
59 89
 \keyword{classes}
60 90
deleted file mode 100644
... ...
@@ -1,98 +0,0 @@
1
-\name{ReadLinesInput-class}
2
-
3
-\Rdversion{1.1}
4
-\docType{class}
5
-
6
-\alias{ReadLinesInput-class}
7
-\alias{readLinesReaderFactory}
8
-\alias{scanReaderFactory}
9
-\alias{readLinesParserFactory}
10
-\alias{scanParserFactory}
11
-\alias{concatenationParserFactory}
12
-\alias{ReadLinesInput}
13
-
14
-\title{Class "ReadLinesInput"}
15
-
16
-\description{
17
-
18
-  A \code{\linkS4class{Producer}}-class to interpret text files. Users
19
-  interact with this class through the constructor \code{ReadLinesInput}
20
-  and methods \code{\link{yield}}, \code{\link{reset}}, and
21
-  \code{\link{stream}}.
22
-
23
-  This class requires two helper functions; the \sQuote{factory} methods
24
-  defined on this page can be used to supply these.
25
-  \code{readLinesReaderFactory} creates a \sQuote{reader}, whose
26
-  responsibility it is to accept a connection and return a
27
-  \code{character} vector.  \code{readLinesParserFactory} creates a
28
-  \sQuote{parser}, responsible for parsing a buffer and vector of the
29
-  same type as produced by the reader into records.
30
-  
31
-}
32
-
33
-\usage{
34
-ReadLinesInput(con, reader = readLinesReaderFactory(), 
35
-    parser = readLinesParserFactory(), yieldSize = 1e+06, ...)
36
-readLinesReaderFactory(blockSize=1e+06, ...)
37
-scanReaderFactory(blockSize=1e06, ...)
38
-}
39
-
40
-\arguments{
41
-  \item{con}{A character string or connection (opened as \code{"r"}
42
-    mode) from which character data will be retrieved.}
43
-  \item{yieldSize}{The number of records the input parser is to yield.}
44
-  \item{reader}{A function of one argument (\code{con}, an open
45
-    connection positioned at the start of the file, or at the position
46
-    the \code{con} was in at the end of the previous invocation of the
47
-    reader function) that returns a vector of type \code{character}.}
48
-  \item{parser}{A function of two arguments (\code{buf}, \code{bin}),
49
-    parsing the \code{raw} vector \code{c(buf, bin)} into records.}
50
-
51
-  % readLinesReaderFactory
52
-  \item{blockSize}{The number of characters to read at one time.}
53
-
54
-  \item{...}{Additional arugments.}
55
- }
56
-
57
-\section{Fields}{
58
-  \describe{
59
-    \item{\code{con}:}{Object of class \code{connection}. An R
60
-      \code{\link{connection}} opened in \dQuote{r} mode from which
61
-      data will be read.}
62
-    \item{\code{blockSize}:}{Object of class \code{integer}. Size of 
63
-        input during each \code{\link{yield}}.}
64
-    \item{\code{reader}:}{Object of class \code{function}. A function
65
-      used to input \code{blockSize} elements. See
66
-      \code{\link{readLinesReaderFactory}}.}
67
-    \item{\code{parser}:}{Object of class \code{function}. A function
68
-      used to parse character input into records. See
69
-      \code{\link{readLinesParserFactory}}}
70
-    \item{\code{.records}:}{Object of class \code{character}. Records that have
71
-        been read and parsed but not yet yield-ed records.}
72
-    \item{\code{verbose}:}{Object of class \code{logical}. Should
73
-      progress be reported?}
74
-  }
75
-}
76
-
77
-\section{Class-Based Methods}{
78
-  \describe{
79
-    \item{\code{initialize(..)}:}{Called during object creation with values to
80
-      initialize fields.}
81
-    \item{\code{reset()}:}{Remove buffer and current records, reset
82
-      record counter, re-open \code{con}.}
83
-    \item{\code{status()}:}{Summarize status of stream.}
84
-    \item{\code{yield()}:}{Process stream to yield as many complete
85
-      records as are represented in the current \code{blockSize}
86
-      elements.}
87
-  }
88
-}
89
-
90
-\author{Martin Morgan \url{mtmorgan@fhcrc.org}}
91
-
92
-\seealso{\code{\link{stream}}, \code{\link{connect}}}
93
-
94
-\examples{
95
-showClass("ReadLinesInput")
96
-}
97
-
98
-\keyword{classes}