1 /**
2  This module provides utility classes to iterate over data.
3  
4  It is not mandatory to use them when using vectorflow, but you might find them
5  useful and slightly more intuitive to use than the built-in range mechanism if
6  you're a beginner with D.
7 
8  When creating a dataset for vectorflow, it is important for the data sharding
9  to be thread-safe if learning over multiple cores is considered, as data
10  parallelism with Hogwild is the main strategy used.
11  By default, vectorflow will try to shard the forward range provided with
12  std.range.evenChunks, which might or might not work depending on your
13  specific reader. To explicitly shard the data, just specify an `evenChunks`
14  function in your reader implementation (see MultiFilesReader for an example).
15  
16  Copyright: 2017 Netflix, Inc.
17  License: $(LINK2 http://www.apache.org/licenses/LICENSE-2.0, Apache License Version 2.0)
18  */
19 module vectorflow.dataset;
20 
21 private
22 {
23 import std.conv : to;
24 import std.stdio;
25 import std.traits : isBuiltinType;
26 import std.range : _evenChunks = evenChunks;
27 }
28 
29 class DataFileReader(T) : DataReader!T
30 {
31     protected File _f;
32     protected string _path;
33     protected bool _binary;
34     public @property path(){return _path;}
35 
36     this(string path, bool binary)
37     {
38         _path = path;
39         _binary = binary;
40         rewind();
41     }
42 
43     protected override abstract bool read_next();
44     override abstract @property DataFileReader!T save();
45 
46     override @property bool empty()
47     {
48         return _f.eof;
49     }
50 
51     ~this()
52     {
53         _f.close();
54     }
55 
56     override void rewind()
57     {
58         _f.close();
59         if(_binary)
60             _f.open(_path, "rb");
61         else
62             _f.open(_path, "r");
63     }
64 
65     protected override void share_save_params(DataReader!T e)
66     {
67         super.share_save_params(e);
68         _f.seek((cast(DataFileReader!T)e)._f.tell);
69     }
70 }
71 
72 
73 class MultiFilesReader(T)
74 {
75     DataFileReader!(T)[] readers;
76 
77     protected ulong _currInd;
78     public @property ulong currentFileIndex(){return _currInd;}
79 
80     private bool _cached;
81 
82     this(DataFileReader!(T)[] readers_)
83     {
84         readers = readers_;
85         _currInd = 0;
86         _cached = false;
87     }
88 
89     int opApply(scope int delegate(ref T) dg)
90     {
91         int result = 0;
92         if(readers[0]._cache.length == 0)
93         {
94             foreach(r; readers)
95             {
96                 foreach(ref T obs; r)
97                 {
98                     result = dg(obs);
99                     if (result)
100                         break;
101                 }
102             }
103         }
104         else
105         {
106             foreach(r; readers)
107             {
108                 foreach(ref T obs; r._cache)
109                 {
110                     result = dg(obs);
111                     if(result)
112                         break;
113                 }                
114             }
115         }
116         rewind();
117         return result;
118     }
119 
120     @property bool empty()
121     {
122         return _currInd == readers.length - 1 && readers[_currInd].empty;
123     }
124 
125     @property ref T front()
126     {
127         return readers[_currInd].front;
128     }
129 
130     void popFront()
131     {
132         if(readers[_currInd].empty)
133             _currInd++;
134         readers[_currInd].popFront();
135     }
136 
137     void rewind()
138     {
139         foreach(r; readers)
140             r.rewind();
141         _currInd = 0;
142     }
143 
144     @property MultiFilesReader!(T) save()
145     {
146         DataFileReader!(T)[] cps;
147         foreach(r; readers)
148             cps ~= r.save();
149         auto cp = new MultiFilesReader!T(cps);
150         if(_cached)
151             cp.cache();
152 
153         return cp;
154     }
155 
156     @property ulong length()
157     {
158         ulong s = 0;
159         foreach(r; readers)
160             s += r.length;
161         return s;
162     }
163 
164     /// Split reader into num_chunks readers based on even number of
165     /// physical files
166     MultiFilesReader!(T)[] evenChunks(uint num_chunks)
167     {
168         MultiFilesReader!(T)[] res;
169         DataFileReader!(T)[] cps;
170         foreach(r; readers)
171             cps ~= r.save();
172         auto files_chunks = _evenChunks(cps, num_chunks);
173         foreach(c; files_chunks)
174         {
175             auto r = new MultiFilesReader!(T)(c);
176             if(_cached)
177                 r.cache();
178             res ~=r;
179         }
180         return res;
181     }
182 
183     MultiFilesReader!T cache()
184     {
185         _cached = true;
186         foreach(r; readers)
187             r.cache();
188         return this;
189     }
190 }
191 
192 
193 class DataReader(T)
194 {
195     protected T _obs;
196     protected ulong _length;
197 
198     T[] _cache;
199     protected bool _start_cache;
200 
201     /// sum in bytes of the size of the dataset elements in memory
202     protected ulong _memory_size;
203 
204     this()
205     {
206         _length = -1;
207         _start_cache = false;
208         _memory_size = 0;
209     }
210 
211     protected abstract bool read_next();
212 
213     int opApply(scope int delegate(ref T) dg)
214     {
215         int result = 0;
216         static if(__traits(compiles, _obs.dup) || isBuiltinType!T)
217         {
218             if(_start_cache && _cache.length == 0)
219             {
220                 while(read_next())
221                 {
222                     static if(__traits(compiles, _obs.dup))
223                         _cache ~= _obs.dup;
224                     else
225                         _cache ~= _obs;
226                     result = dg(_obs);
227                     if (result)
228                         break;
229                 }
230                 _length = _cache.length;
231                 _start_cache = false;
232             }
233             else if(_cache.length != 0)
234             {
235                 foreach(ref T obs; _cache)
236                 {
237                     result = dg(obs);
238                     if(result)
239                         break;
240                 }
241             }
242             else
243             {
244                 while(read_next())
245                 {
246                     result = dg(_obs);
247                     if (result)
248                         break;
249                 }
250             }
251         }
252         else
253         {
254             while(read_next())
255             {
256                 result = dg(_obs);
257                 if (result)
258                     break;
259             }
260         }
261         rewind();
262         return result;
263     }
264 
265     abstract @property bool empty();
266 
267     @property ref T front()
268     {
269         return _obs;
270     }
271 
272     void popFront()
273     {
274         read_next();
275     }
276 
277     abstract void rewind();
278 
279     @property ulong length()
280     {
281         if(_length == -1)
282         {
283             rewind();
284             _length = 0;
285             while(read_next())
286                 _length++;
287         }
288         return _length;
289     }
290 
291     abstract @property DataReader!T save();
292 
293     protected void share_save_params(DataReader!T e)
294     {
295         _length = e._length;
296         _cache = e._cache;
297     }
298 
299     DataReader!T cache()
300     {
301         static if(__traits(compiles, _obs.dup) || isBuiltinType!T)
302         {
303             if(_cache.length == 0)
304                 _start_cache = true;
305             return this;
306         }
307         else
308         {
309             throw new Exception(
310                 "Support of dataset caching requires a `.dup` " ~
311                 "function on the elements of your dataset in order to " ~
312                 "be able to store an immutable copy of them in memory");
313         }
314     }
315 
316     @property float memory_size(string unit)()
317         if(unit == "B" || unit == "KB" || unit == "MB" || unit == "GB")
318     {
319         if(_memory_size == 0)
320             throw new Exception("Dataset didn't populate memory size field.");
321         auto bytes = _memory_size.to!double;
322         static if(unit == "B")
323             return bytes;
324         static if(unit == "KB")
325             return bytes / 1_024;
326         static if(unit == "MB")
327             return bytes / (1_024 * 1_024);
328         static if(unit == "GB")
329             return bytes / (1_024 * 1_024 * 1_024);
330     }
331 }