Tuesday, January 15, 2019

Clojure for fast processing of streams of data via LAZY-SEQ and SEQUE

UPDATE: After some back and forth I think clojure.core.async with its several buffers both at source, computation/transformation and sink areas is a better fit for what I'm doing! 

LAZY-SEQ and SEQUE are useful in many contexts where you need to speed up processing of streams of data (SEQ) from e.g. another computation, a filesystem or a web or database server -- anything really.

The key idea is that SEQUE will continuously fetch the next few elements from the stream in advance and in the background (thread) -- while the code that consumes or handles the data keeps on working on the current or already fetched element(s) in the foreground.

A quick video to demonstrate the effect:


SEQUE uses a LinkedBlockingQueue behind the scenes, but you can pass it anything that implements the BlockingQueue interface as needed.

Clojure makes this simple and fun and all of this might be pretty basic and trivial for many, but a small "trick" is needed to set it up correctly -- like this:

;; www.Quanto.ga
;;
;; * You have a stream of data where generating each new element for the stream takes some time.
;; * You consume and do some calculations on each element from the stream; this too takes some time.
;; * When the consumer is done with its calculations for a element (or chunk of elements!) you do
;; not want it to wait around for the code that fetches new elements to finish fetching a new element
;; -- because this could be done in the background while you where doing the calculations.
;;
;; A way to deal with this is to use SEQUE which will keep production N steps in advance of consumption
;; via a background thread that works on Clojue seqs.
;; The starting scenario, without use of SEQUE:
(defn blah1
([] (blah1 0))
([^long i]
(lazy-seq
(Thread/sleep 250)
(println "i:" i)
(cons i (blah1 (inc i))))))
quantataraxia.core> (time (run! #(do (Thread/sleep 1000) (println "output:" %))
(take 5 (blah1))))
i: 0
output: 0
i: 1
output: 1
i: 2
output: 2
i: 3
output: 3
i: 4
output: 4
"Elapsed time: 6259.611617 msecs"
;; We cannot simply wrap the BLAH1 body in a SEQUE form here, because then we'd spawn a new
;; background worker for each step since BLAH1 is calling itself. Instead we need to setup
;; an inner helper Fn which we call instead -- the seq this Fn returns when called is then
;; wrapped in a call to SEQUE, like this:
(defn blah2
([] (blah2 0))
([^long i]
(seque 2
((fn inner [^long i]
(lazy-seq
(Thread/sleep 250)
(println "i:" i)
(cons i (inner (inc i)))))
i))))
quantataraxia.core> (time (run! #(do (Thread/sleep 1000) (println "output:" %))
(take 5 (blah2))))
i: 0
i: 1
i: 2
i: 3
i: 4
output: 0
i: 5
output: 1
i: 6
output: 2
i: 7
output: 3
i: 8
output: 4
"Elapsed time: 5509.658327 msecs"

No comments:

Post a Comment