1 /** 2 This module provides utility classes to iterate over data. 3 4 It is not mandatory to use them when using vectorflow, but you might find them 5 useful and slightly more intuitive to use than the built-in range mechanism if 6 you're a beginner with D. 7 8 When creating a dataset for vectorflow, it is important for the data sharding 9 to be thread-safe if learning over multiple cores is considered, as data 10 parallelism with Hogwild is the main strategy used. 11 By default, vectorflow will try to shard the forward range provided with 12 std.range.evenChunks, which might or might not work depending on your 13 specific reader. To explicitly shard the data, just specify an `evenChunks` 14 function in your reader implementation (see MultiFilesReader for an example). 15 16 Copyright: 2017 Netflix, Inc. 17 License: $(LINK2 http://www.apache.org/licenses/LICENSE-2.0, Apache License Version 2.0) 18 */ 19 module vectorflow.dataset; 20 21 private 22 { 23 import std.conv : to; 24 import std.stdio; 25 import std.traits : isBuiltinType; 26 import std.range : _evenChunks = evenChunks; 27 } 28 29 class DataFileReader(T) : DataReader!T 30 { 31 protected File _f; 32 protected string _path; 33 protected bool _binary; 34 public @property path(){return _path;} 35 36 this(string path, bool binary) 37 { 38 _path = path; 39 _binary = binary; 40 rewind(); 41 } 42 43 protected override abstract bool read_next(); 44 override abstract @property DataFileReader!T save(); 45 46 override @property bool empty() 47 { 48 return _f.eof; 49 } 50 51 ~this() 52 { 53 _f.close(); 54 } 55 56 override void rewind() 57 { 58 _f.close(); 59 if(_binary) 60 _f.open(_path, "rb"); 61 else 62 _f.open(_path, "r"); 63 } 64 65 protected override void share_save_params(DataReader!T e) 66 { 67 super.share_save_params(e); 68 _f.seek((cast(DataFileReader!T)e)._f.tell); 69 } 70 } 71 72 73 class MultiFilesReader(T) 74 { 75 DataFileReader!(T)[] readers; 76 77 protected ulong _currInd; 78 public @property ulong currentFileIndex(){return _currInd;} 79 80 private bool _cached; 81 82 this(DataFileReader!(T)[] readers_) 83 { 84 readers = readers_; 85 _currInd = 0; 86 _cached = false; 87 } 88 89 int opApply(scope int delegate(ref T) dg) 90 { 91 int result = 0; 92 if(readers[0]._cache.length == 0) 93 { 94 foreach(r; readers) 95 { 96 foreach(ref T obs; r) 97 { 98 result = dg(obs); 99 if (result) 100 break; 101 } 102 } 103 } 104 else 105 { 106 foreach(r; readers) 107 { 108 foreach(ref T obs; r._cache) 109 { 110 result = dg(obs); 111 if(result) 112 break; 113 } 114 } 115 } 116 rewind(); 117 return result; 118 } 119 120 @property bool empty() 121 { 122 return _currInd == readers.length - 1 && readers[_currInd].empty; 123 } 124 125 @property ref T front() 126 { 127 return readers[_currInd].front; 128 } 129 130 void popFront() 131 { 132 if(readers[_currInd].empty) 133 _currInd++; 134 readers[_currInd].popFront(); 135 } 136 137 void rewind() 138 { 139 foreach(r; readers) 140 r.rewind(); 141 _currInd = 0; 142 } 143 144 @property MultiFilesReader!(T) save() 145 { 146 DataFileReader!(T)[] cps; 147 foreach(r; readers) 148 cps ~= r.save(); 149 auto cp = new MultiFilesReader!T(cps); 150 if(_cached) 151 cp.cache(); 152 153 return cp; 154 } 155 156 @property ulong length() 157 { 158 ulong s = 0; 159 foreach(r; readers) 160 s += r.length; 161 return s; 162 } 163 164 /// Split reader into num_chunks readers based on even number of 165 /// physical files 166 MultiFilesReader!(T)[] evenChunks(uint num_chunks) 167 { 168 MultiFilesReader!(T)[] res; 169 DataFileReader!(T)[] cps; 170 foreach(r; readers) 171 cps ~= r.save(); 172 auto files_chunks = _evenChunks(cps, num_chunks); 173 foreach(c; files_chunks) 174 { 175 auto r = new MultiFilesReader!(T)(c); 176 if(_cached) 177 r.cache(); 178 res ~=r; 179 } 180 return res; 181 } 182 183 MultiFilesReader!T cache() 184 { 185 _cached = true; 186 foreach(r; readers) 187 r.cache(); 188 return this; 189 } 190 } 191 192 193 class DataReader(T) 194 { 195 protected T _obs; 196 protected ulong _length; 197 198 T[] _cache; 199 protected bool _start_cache; 200 201 /// sum in bytes of the size of the dataset elements in memory 202 protected ulong _memory_size; 203 204 this() 205 { 206 _length = -1; 207 _start_cache = false; 208 _memory_size = 0; 209 } 210 211 protected abstract bool read_next(); 212 213 int opApply(scope int delegate(ref T) dg) 214 { 215 int result = 0; 216 static if(__traits(compiles, _obs.dup) || isBuiltinType!T) 217 { 218 if(_start_cache && _cache.length == 0) 219 { 220 while(read_next()) 221 { 222 static if(__traits(compiles, _obs.dup)) 223 _cache ~= _obs.dup; 224 else 225 _cache ~= _obs; 226 result = dg(_obs); 227 if (result) 228 break; 229 } 230 _length = _cache.length; 231 _start_cache = false; 232 } 233 else if(_cache.length != 0) 234 { 235 foreach(ref T obs; _cache) 236 { 237 result = dg(obs); 238 if(result) 239 break; 240 } 241 } 242 else 243 { 244 while(read_next()) 245 { 246 result = dg(_obs); 247 if (result) 248 break; 249 } 250 } 251 } 252 else 253 { 254 while(read_next()) 255 { 256 result = dg(_obs); 257 if (result) 258 break; 259 } 260 } 261 rewind(); 262 return result; 263 } 264 265 abstract @property bool empty(); 266 267 @property ref T front() 268 { 269 return _obs; 270 } 271 272 void popFront() 273 { 274 read_next(); 275 } 276 277 abstract void rewind(); 278 279 @property ulong length() 280 { 281 if(_length == -1) 282 { 283 rewind(); 284 _length = 0; 285 while(read_next()) 286 _length++; 287 } 288 return _length; 289 } 290 291 abstract @property DataReader!T save(); 292 293 protected void share_save_params(DataReader!T e) 294 { 295 _length = e._length; 296 _cache = e._cache; 297 } 298 299 DataReader!T cache() 300 { 301 static if(__traits(compiles, _obs.dup) || isBuiltinType!T) 302 { 303 if(_cache.length == 0) 304 _start_cache = true; 305 return this; 306 } 307 else 308 { 309 throw new Exception( 310 "Support of dataset caching requires a `.dup` " ~ 311 "function on the elements of your dataset in order to " ~ 312 "be able to store an immutable copy of them in memory"); 313 } 314 } 315 316 @property float memory_size(string unit)() 317 if(unit == "B" || unit == "KB" || unit == "MB" || unit == "GB") 318 { 319 if(_memory_size == 0) 320 throw new Exception("Dataset didn't populate memory size field."); 321 auto bytes = _memory_size.to!double; 322 static if(unit == "B") 323 return bytes; 324 static if(unit == "KB") 325 return bytes / 1_024; 326 static if(unit == "MB") 327 return bytes / (1_024 * 1_024); 328 static if(unit == "GB") 329 return bytes / (1_024 * 1_024 * 1_024); 330 } 331 }