I'm performing streaming reads of an object using BufferedReader.
I need to do two things with this object:
- Pass it to a SuperCSV csv reader
- Obtain the raw lines and keep them in a (Clojure) lazy sequence
Currently, I am having to use two different BufferedReaders: one as an argument to a SuperCSV CSV reader class and one to initialize the lazy sequence of raw lines. I'm effectively downloading the S3 object twice, which is expensive ($) and slow.
One of my colleagues pointed out that something analogous to a Unix "tee" command is what I'm looking for. A BufferedReader that could somehow be "split", download a chunk of data, and pass a开发者_开发技巧 copy to both the lazy sequence and csv reader functionality would be useful.
I'm also currently investigating whether it would be possible to wrap the lazy sequence in a BufferedReader and pass that to super csv. I've had some Java heap space issues when passing very large lazy sequences to multiple consumers, so I'm kind of worried about employing this solution.
Another solution is just downloading the file locally and then opening two streams on this file. This eliminates the original motivation behind streaming: allowing processing of the file to begin as soon as data starts arriving.
The final solution, and one that I'd consider only if nothing else works, is implementing my own CSV reader that returns both parsed CSV and the original unparsed line. If you've used a very solid CSV reader that can return both a Java Hash of parsed CSV data and the original unparsed line, please let me know!
Thanks!
I'd be inclined to go with creating a seq of lines from the network, and then hand that over to however many processes need to work on that seq; persistent data structures are cool that way. In the case of needing to turn a seq of strings into a Reader that you can hand off to the SuperCSV api, this seems to work:
(import '[java.io Reader StringReader]) (defn concat-reader "Returns a Reader that reads from a sequence of strings." [lines] (let [srs (atom (map #(StringReader. %) lines))] (proxy [Reader] [] (read ([] (let [c (.read (first @srs))] (if (and (neg? c) (swap! srs next)) (.read this) c))) ([cbuf] (.read this cbuf 0 (count cbuf))) ([cbuf off len] (let [actual (.read (first @srs) cbuf off len)] (if (and (neg? actual) (swap! srs next)) (.read this cbuf off len) actual)))) (close [] ))))
E.g.
user=> (def r (concat-reader ["foo" "bar"])) #'user/r user=> (def cbuf (char-array 2)) #'user/cbuf user=> (.read r cbuf) 2 user=> (seq cbuf) (\f \o) user=> (char (.read r)) \o user=> (char (.read r)) \b
The solution was to use a single BufferedReader for all accesses and then reset()ing it every time it is passed into functionality that needs to read from the beginning.
精彩评论