1 /** 2 This module provides utility classes to iterate over data. 3 4 It is not mandatory to use them when using vectorflow, but you might find them 5 useful and slightly more intuitive to use than the built-in range mechanism if 6 you're a beginner with D. 7 8 When creating a dataset for vectorflow, it is important for the data sharding 9 to be thread-safe if learning over multiple cores is considered, as data 10 parallelism with Hogwild is the main strategy used. 11 By default, vectorflow will try to shard the forward range provided with 12 std.range.evenChunks, which might or might not work depending on your 13 specific reader. To explicitly shard the data, just specify an `evenChunks` 14 function in your reader implementation (see MultiFilesReader for an example). 15 16 Copyright: 2017 Netflix, Inc. 17 License: $(LINK2 http://www.apache.org/licenses/LICENSE-2.0, Apache License Version 2.0) 18 */ 19 module vectorflow.dataset; 20 21 private 22 { 23 import std.conv : to; 24 import std.stdio; 25 import std.traits : isBuiltinType; 26 import std.range : _evenChunks = evenChunks; 27 } 28 29 class DataFileReader(T) : DataReader!T 30 { 31 protected File _f; 32 protected string _path; 33 protected bool _binary; 34 public @property path(){return _path;} 35 36 this(string path, bool binary) 37 { 38 _path = path; 39 _binary = binary; 40 rewind(); 41 } 42 43 protected override abstract bool read_next(); 44 override abstract @property DataFileReader!T save(); 45 46 override @property bool empty() 47 { 48 return _f.eof; 49 } 50 51 ~this() 52 { 53 _f.close(); 54 } 55 56 override void rewind() 57 { 58 _f.close(); 59 if(_binary) 60 _f.open(_path, "rb"); 61 else 62 _f.open(_path, "r"); 63 } 64 65 protected override void share_save_params(DataReader!T e) 66 { 67 super.share_save_params(e); 68 _f.seek(e.to!(DataFileReader!T)._f.tell); 69 } 70 } 71 72 73 class MultiFilesReader(T) 74 { 75 DataFileReader!(T)[] readers; 76 protected size_t _currInd; 77 public @property ulong currentFileIndex(){return _currInd;} 78 79 private bool _cached; 80 81 this(DataFileReader!(T)[] readers_) 82 { 83 readers = readers_; 84 _currInd = 0; 85 _cached = false; 86 } 87 88 int opApply(scope int delegate(ref T) dg) 89 { 90 int result = 0; 91 if(readers[0]._cache.length == 0) 92 { 93 foreach(r; readers) 94 { 95 foreach(ref T obs; r) 96 { 97 result = dg(obs); 98 if (result) 99 break; 100 } 101 } 102 } 103 else 104 { 105 foreach(r; readers) 106 { 107 foreach(ref T obs; r._cache) 108 { 109 result = dg(obs); 110 if(result) 111 break; 112 } 113 } 114 } 115 rewind(); 116 return result; 117 } 118 119 @property bool empty() 120 { 121 return _currInd == readers.length - 1 && readers[_currInd].empty; 122 } 123 124 @property ref T front() 125 { 126 return readers[_currInd].front; 127 } 128 129 void popFront() 130 { 131 if(readers[_currInd].empty) 132 _currInd++; 133 readers[_currInd].popFront(); 134 } 135 136 void rewind() 137 { 138 foreach(r; readers) 139 r.rewind(); 140 _currInd = 0; 141 } 142 143 @property MultiFilesReader!(T) save() 144 { 145 DataFileReader!(T)[] cps; 146 foreach(r; readers) 147 cps ~= r.save(); 148 auto cp = new MultiFilesReader!T(cps); 149 if(_cached) 150 cp.cache(); 151 152 return cp; 153 } 154 155 @property ulong length() 156 { 157 ulong s = 0; 158 foreach(r; readers) 159 s += r.length; 160 return s; 161 } 162 163 /// Split reader into num_chunks readers based on even number of 164 /// physical files 165 MultiFilesReader!(T)[] evenChunks(uint num_chunks) 166 { 167 MultiFilesReader!(T)[] res; 168 DataFileReader!(T)[] cps; 169 foreach(r; readers) 170 cps ~= r.save(); 171 auto files_chunks = _evenChunks(cps, num_chunks); 172 foreach(c; files_chunks) 173 { 174 auto r = new MultiFilesReader!(T)(c); 175 if(_cached) 176 r.cache(); 177 res ~=r; 178 } 179 return res; 180 } 181 182 MultiFilesReader!T cache() 183 { 184 _cached = true; 185 foreach(r; readers) 186 r.cache(); 187 return this; 188 } 189 } 190 191 192 class DataReader(T) 193 { 194 protected T _obs; 195 protected size_t _length; 196 197 T[] _cache; 198 protected bool _start_cache; 199 200 /// sum in bytes of the size of the dataset elements in memory 201 protected ulong _memory_size; 202 203 this() 204 { 205 _length = -1; 206 _start_cache = false; 207 _memory_size = 0; 208 } 209 210 protected abstract bool read_next(); 211 212 int opApply(scope int delegate(ref T) dg) 213 { 214 int result = 0; 215 static if(__traits(compiles, _obs.dup) || isBuiltinType!T) 216 { 217 if(_start_cache && _cache.length == 0) 218 { 219 while(read_next()) 220 { 221 static if(__traits(compiles, _obs.dup)) 222 _cache ~= _obs.dup; 223 else 224 _cache ~= _obs; 225 result = dg(_obs); 226 if (result) 227 break; 228 } 229 _length = _cache.length; 230 _start_cache = false; 231 } 232 else if(_cache.length != 0) 233 { 234 foreach(ref T obs; _cache) 235 { 236 result = dg(obs); 237 if(result) 238 break; 239 } 240 } 241 else 242 { 243 while(read_next()) 244 { 245 result = dg(_obs); 246 if (result) 247 break; 248 } 249 } 250 } 251 else 252 { 253 while(read_next()) 254 { 255 result = dg(_obs); 256 if (result) 257 break; 258 } 259 } 260 rewind(); 261 return result; 262 } 263 264 abstract @property bool empty(); 265 266 @property ref T front() 267 { 268 return _obs; 269 } 270 271 void popFront() 272 { 273 read_next(); 274 } 275 276 abstract void rewind(); 277 278 @property size_t length() 279 { 280 if(_length == -1) 281 { 282 rewind(); 283 _length = 0; 284 while(read_next()) 285 _length++; 286 } 287 return _length; 288 } 289 290 abstract @property DataReader!T save(); 291 292 protected void share_save_params(DataReader!T e) 293 { 294 _length = e._length; 295 _cache = e._cache; 296 } 297 298 DataReader!T cache() 299 { 300 static if(__traits(compiles, _obs.dup) || isBuiltinType!T) 301 { 302 if(_cache.length == 0) 303 _start_cache = true; 304 return this; 305 } 306 else 307 { 308 throw new Exception( 309 "Support of dataset caching requires a `.dup` " ~ 310 "function on the elements of your dataset in order to " ~ 311 "be able to store an immutable copy of them in memory"); 312 } 313 } 314 315 @property float memory_size(string unit)() 316 if(unit == "B" || unit == "KB" || unit == "MB" || unit == "GB") 317 { 318 if(_memory_size == 0) 319 throw new Exception("Dataset didn't populate memory size field."); 320 auto bytes = _memory_size.to!double; 321 static if(unit == "B") 322 return bytes; 323 static if(unit == "KB") 324 return bytes / 1_024; 325 static if(unit == "MB") 326 return bytes / (1_024 * 1_024); 327 static if(unit == "GB") 328 return bytes / (1_024 * 1_024 * 1_024); 329 } 330 }