1 /**
2  This module provides utility classes to iterate over data.
3 
4  It is not mandatory to use them when using vectorflow, but you might find them
5  useful and slightly more intuitive to use than the built-in range mechanism if
6  you're a beginner with D.
7 
8  When creating a dataset for vectorflow, it is important for the data sharding
9  to be thread-safe if learning over multiple cores is considered, as data
10  parallelism with Hogwild is the main strategy used.
11  By default, vectorflow will try to shard the forward range provided with
12  std.range.evenChunks, which might or might not work depending on your
13  specific reader. To explicitly shard the data, just specify an `evenChunks`
14  function in your reader implementation (see MultiFilesReader for an example).
15 
16  Copyright: 2017 Netflix, Inc.
17  License: $(LINK2 http://www.apache.org/licenses/LICENSE-2.0, Apache License Version 2.0)
18  */
19 module vectorflow.dataset;
20 
21 private
22 {
23 import std.conv : to;
24 import std.stdio;
25 import std.traits : isBuiltinType;
26 import std.range : _evenChunks = evenChunks;
27 }
28 
29 class DataFileReader(T) : DataReader!T
30 {
31     protected File _f;
32     protected string _path;
33     protected bool _binary;
34     public @property path(){return _path;}
35 
36     this(string path, bool binary)
37     {
38         _path = path;
39         _binary = binary;
40         rewind();
41     }
42 
43     protected override abstract bool read_next();
44     override abstract @property DataFileReader!T save();
45 
46     override @property bool empty()
47     {
48         return _f.eof;
49     }
50 
51     ~this()
52     {
53         _f.close();
54     }
55 
56     override void rewind()
57     {
58         _f.close();
59         if(_binary)
60             _f.open(_path, "rb");
61         else
62             _f.open(_path, "r");
63     }
64 
65     protected override void share_save_params(DataReader!T e)
66     {
67         super.share_save_params(e);
68         _f.seek(e.to!(DataFileReader!T)._f.tell);
69     }
70 }
71 
72 
73 class MultiFilesReader(T)
74 {
75     DataFileReader!(T)[] readers;
76     protected size_t _currInd;
77     public @property ulong currentFileIndex(){return _currInd;}
78 
79     private bool _cached;
80 
81     this(DataFileReader!(T)[] readers_)
82     {
83         readers = readers_;
84         _currInd = 0;
85         _cached = false;
86     }
87 
88     int opApply(scope int delegate(ref T) dg)
89     {
90         int result = 0;
91         if(readers[0]._cache.length == 0)
92         {
93             foreach(r; readers)
94             {
95                 foreach(ref T obs; r)
96                 {
97                     result = dg(obs);
98                     if (result)
99                         break;
100                 }
101             }
102         }
103         else
104         {
105             foreach(r; readers)
106             {
107                 foreach(ref T obs; r._cache)
108                 {
109                     result = dg(obs);
110                     if(result)
111                         break;
112                 }
113             }
114         }
115         rewind();
116         return result;
117     }
118 
119     @property bool empty()
120     {
121         return _currInd == readers.length - 1 && readers[_currInd].empty;
122     }
123 
124     @property ref T front()
125     {
126         return readers[_currInd].front;
127     }
128 
129     void popFront()
130     {
131         if(readers[_currInd].empty)
132             _currInd++;
133         readers[_currInd].popFront();
134     }
135 
136     void rewind()
137     {
138         foreach(r; readers)
139             r.rewind();
140         _currInd = 0;
141     }
142 
143     @property MultiFilesReader!(T) save()
144     {
145         DataFileReader!(T)[] cps;
146         foreach(r; readers)
147             cps ~= r.save();
148         auto cp = new MultiFilesReader!T(cps);
149         if(_cached)
150             cp.cache();
151 
152         return cp;
153     }
154 
155     @property ulong length()
156     {
157         ulong s = 0;
158         foreach(r; readers)
159             s += r.length;
160         return s;
161     }
162 
163     /// Split reader into num_chunks readers based on even number of
164     /// physical files
165     MultiFilesReader!(T)[] evenChunks(uint num_chunks)
166     {
167         MultiFilesReader!(T)[] res;
168         DataFileReader!(T)[] cps;
169         foreach(r; readers)
170             cps ~= r.save();
171         auto files_chunks = _evenChunks(cps, num_chunks);
172         foreach(c; files_chunks)
173         {
174             auto r = new MultiFilesReader!(T)(c);
175             if(_cached)
176                 r.cache();
177             res ~=r;
178         }
179         return res;
180     }
181 
182     MultiFilesReader!T cache()
183     {
184         _cached = true;
185         foreach(r; readers)
186             r.cache();
187         return this;
188     }
189 }
190 
191 
192 class DataReader(T)
193 {
194     protected T _obs;
195     protected size_t _length;
196 
197     T[] _cache;
198     protected bool _start_cache;
199 
200     /// sum in bytes of the size of the dataset elements in memory
201     protected ulong _memory_size;
202 
203     this()
204     {
205         _length = -1;
206         _start_cache = false;
207         _memory_size = 0;
208     }
209 
210     protected abstract bool read_next();
211 
212     int opApply(scope int delegate(ref T) dg)
213     {
214         int result = 0;
215         static if(__traits(compiles, _obs.dup) || isBuiltinType!T)
216         {
217             if(_start_cache && _cache.length == 0)
218             {
219                 while(read_next())
220                 {
221                     static if(__traits(compiles, _obs.dup))
222                         _cache ~= _obs.dup;
223                     else
224                         _cache ~= _obs;
225                     result = dg(_obs);
226                     if (result)
227                         break;
228                 }
229                 _length = _cache.length;
230                 _start_cache = false;
231             }
232             else if(_cache.length != 0)
233             {
234                 foreach(ref T obs; _cache)
235                 {
236                     result = dg(obs);
237                     if(result)
238                         break;
239                 }
240             }
241             else
242             {
243                 while(read_next())
244                 {
245                     result = dg(_obs);
246                     if (result)
247                         break;
248                 }
249             }
250         }
251         else
252         {
253             while(read_next())
254             {
255                 result = dg(_obs);
256                 if (result)
257                     break;
258             }
259         }
260         rewind();
261         return result;
262     }
263 
264     abstract @property bool empty();
265 
266     @property ref T front()
267     {
268         return _obs;
269     }
270 
271     void popFront()
272     {
273         read_next();
274     }
275 
276     abstract void rewind();
277 
278     @property size_t length()
279     {
280         if(_length == -1)
281         {
282             rewind();
283             _length = 0;
284             while(read_next())
285                 _length++;
286         }
287         return _length;
288     }
289 
290     abstract @property DataReader!T save();
291 
292     protected void share_save_params(DataReader!T e)
293     {
294         _length = e._length;
295         _cache = e._cache;
296     }
297 
298     DataReader!T cache()
299     {
300         static if(__traits(compiles, _obs.dup) || isBuiltinType!T)
301         {
302             if(_cache.length == 0)
303                 _start_cache = true;
304             return this;
305         }
306         else
307         {
308             throw new Exception(
309                 "Support of dataset caching requires a `.dup` " ~
310                 "function on the elements of your dataset in order to " ~
311                 "be able to store an immutable copy of them in memory");
312         }
313     }
314 
315     @property float memory_size(string unit)()
316         if(unit == "B" || unit == "KB" || unit == "MB" || unit == "GB")
317     {
318         if(_memory_size == 0)
319             throw new Exception("Dataset didn't populate memory size field.");
320         auto bytes = _memory_size.to!double;
321         static if(unit == "B")
322             return bytes;
323         static if(unit == "KB")
324             return bytes / 1_024;
325         static if(unit == "MB")
326             return bytes / (1_024 * 1_024);
327         static if(unit == "GB")
328             return bytes / (1_024 * 1_024 * 1_024);
329     }
330 }