Thursday, January 10, 2019

Big data: from compressed text (e.g. CSV) to compressed binary format -- or why Nippy (Clojure) and java.io.DataOutputStream are awesome

Say you have massive amounts of historical market data in a common, gzip'ed CSV format or similar and you have these data types which represents instances of the data in your system:

(defrecord OFlow ;; Order flow; true trade and volume data!
    [^double trade ;; Positive = buy, negative = sell.
     ^double price ;; Average fill price.
     ^Keyword tick-direction ;; :plus | :zero-plus | :minus | :zero-minus
     ^long timestamp ;; We assume this is the ts for when the order executed in full.

     ^IMarketEvent memeta]

(defrecord MEMeta
    [^Keyword exchange-id
     ^String symbol
     ^long local-timestamp])


A good way to store and access this would be to use a binary format and a modern, fast compression algorithm. The key issue is fast decompression and LZ4HC is the best here as far as I'm aware of -- apparently reaching the limitations of what's possible with regards to RAM speed. To do this we'll use https://github.com/ptaoussanis/nippy which exposes the DataOutputStream class nicely and enables us to express a simple binary protocol for reading and writing our data types, like this:

(nippy/extend-freeze OFlow :QA/OFlow [^OFlow oflow output]
                     (.writeDouble output (.trade oflow))
                     (.writeDouble output (.price oflow))
                     (.writeByte output (case (.tick-direction oflow)
                                          :plus 0, :zero-plus 1, :minus 2, :zero-minus 3))
                     (.writeLong output (.timestamp oflow))
                     ;; MEMeta
                     (.writeUTF output (name (.exchange-id ^MEMeta (.memeta oflow))))
                     (.writeUTF output (.symbol ^MEMeta (.memeta oflow)))
                     (.writeLong output (.local-timestamp ^MEMeta (.memeta oflow))))

(nippy/extend-thaw :QA/OFlow [input]
                   (->OFlow (.readDouble input)
                            (.readDouble input)
                            (case (.readByte input)
                              0 :plus, 1 :zero-plus, 2 :minus, 3 :zero-minus)
                            (.readLong input)
                            (->MEMeta (keyword (.readUTF input))
                                      (.readUTF input)
                                      (.readLong input))))


..to write out the binary data to a file, you'd do something like (oflow-vector is a vector containing OFlow instances):

(nippy/freeze-to-file "data.dat" oflow-vector                                   
                      {:compressor nippy/lz4hc-compressor, :encryptor nil, :no-header? true})


..and to read it back in to get a vector of OFlow instances as the result you'd do something like this:

(nippy/thaw-from-file "data.dat"
                      {:compressor nippy/lz4hc-compressor, :encryptor nil, :no-header? true})


...it's so simple and the result is very, very good in terms of speed and space savings [I'll add some numbers here later]. Of course you'd still want to use something like PostgreSQL for indexed views or access to the data, but this is very nice for fast access to massive amounts of sequential, high resolution data. I've split things up in such a way that each file contains 1 day worth of data; this way it is possible to make fast requests to ranges of the data at any location without doing long, linear searches. 👍

No comments:

Post a Comment