Revert "Add option to checkpoint every evaluation"
[openmx:openmx.git] / src / omxState.cpp
1 /*
2  *  Copyright 2007-2014 The OpenMx Project
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *       http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 #include <stdarg.h>
18 #include <errno.h>
19
20 #include "omxState.h"
21 #include "Compute.h"
22 #include "omxOpenmpWrap.h"
23
24 struct omxGlobal *Global = NULL;
25
26 FreeVarGroup *omxGlobal::findVarGroup(int id)
27 {
28         size_t numGroups = Global->freeGroup.size();
29         for (size_t vx=0; vx < numGroups; ++vx) {
30                 std::vector<int> &ids = Global->freeGroup[vx]->id;
31                 for (size_t ix=0; ix < ids.size(); ++ix) {
32                         if (ids[ix] == id) return Global->freeGroup[vx];
33                 }
34         }
35         return NULL;
36 }
37
38 FreeVarGroup *omxGlobal::findOrCreateVarGroup(int id)
39 {
40         FreeVarGroup *old = findVarGroup(id);
41         if (old) return old;
42
43         FreeVarGroup *fvg = new FreeVarGroup;
44         fvg->id.push_back(id);
45         Global->freeGroup.push_back(fvg);
46         return fvg;
47 }
48
49 bool FreeVarGroup::hasSameVars(FreeVarGroup *g2)
50 {
51         if (vars.size() != g2->vars.size()) return false;
52
53         for (size_t vx=0; vx < vars.size(); ++vx) {
54                 if (vars[vx] != g2->vars[vx]) return false;
55         }
56         return true;
57 }
58
59 int FreeVarGroup::lookupVar(const char *name)
60 {
61         for (size_t vx=0; vx < vars.size(); ++vx) {
62                 if (strcmp(name, vars[vx]->name) == 0) return vx;
63         }
64         return -1;
65 }
66
67 void FreeVarGroup::cacheDependencies()
68 {
69         omxState *os = globalState;
70         size_t numMats = os->matrixList.size();
71         size_t numAlgs = os->algebraList.size();
72
73         dependencies.assign(numMats + numAlgs, false);
74         locations.assign(numMats, false);
75
76         for (size_t vx = 0; vx < vars.size(); vx++) {
77                 omxFreeVar *fv = vars[vx];
78                 int *deps   = fv->deps;
79                 int numDeps = fv->numDeps;
80                 for (int index = 0; index < numDeps; index++) {
81                         dependencies[deps[index] + numMats] = true;
82                 }
83                 for (size_t lx=0; lx < fv->locations.size(); ++lx) {
84                         locations[fv->locations[lx].matrix] = true;
85                 }
86         }
87
88         // Everything is set up. This is a good place to log.
89         if (OMX_DEBUG) { log(); }
90 }
91
92 void FreeVarGroup::markDirty(omxState *os)
93 {
94         size_t numMats = os->matrixList.size();
95         size_t numAlgs = os->algebraList.size();
96
97         for(size_t i = 0; i < numMats; i++) {
98                 if (!locations[i]) continue;
99                 omxMarkClean(os->matrixList[i]);
100         }
101
102         for(size_t i = 0; i < numMats; i++) {
103                 if (dependencies[i]) {
104                         int offset = ~(i - numMats);
105                         omxMarkDirty(os->matrixList[offset]);
106                 }
107         }
108
109         for(size_t i = 0; i < numAlgs; i++) {
110                 if (dependencies[i + numMats]) {
111                         omxMarkDirty(os->algebraList[i]);
112                 }
113         }
114 }
115
116 void FreeVarGroup::log()
117 {
118         omxState *os = globalState;
119         size_t numMats = os->matrixList.size();
120         size_t numAlgs = os->algebraList.size();
121         std::string str;
122
123         str += string_snprintf("FreeVarGroup(id=%d", id[0]);
124         for (size_t ix=1; ix < id.size(); ++ix) {
125                 str += string_snprintf(",%d", id[ix]);
126         }
127         str += string_snprintf(") with %d variables:", (int) vars.size());
128
129         for (size_t vx=0; vx < vars.size(); ++vx) {
130                 str += " ";
131                 str += vars[vx]->name;
132         }
133         if (vars.size()) str += "\nwill dirty:";
134
135         for(size_t i = 0; i < numMats; i++) {
136                 if (dependencies[i]) {
137                         int offset = ~(i - numMats);
138                         str += " ";
139                         str += os->matrixList[offset]->name;
140                 }
141         }
142
143         for(size_t i = 0; i < numAlgs; i++) {
144                 if (dependencies[i + numMats]) {
145                         str += " ";
146                         str += os->algebraList[i]->name;
147                 }
148         }
149         str += "\n";
150
151         mxLogBig(str);
152 }
153
154 omxGlobal::omxGlobal()
155 {
156         ciMaxIterations = 5;
157         numThreads = 1;
158         analyticGradients = 0;
159         numChildren = 0;
160         llScale = -2.0;
161 }
162
163 void omxGlobal::deduplicateVarGroups()
164 {
165         for (size_t g1=0; g1 < freeGroup.size(); ++g1) {
166                 for (size_t g2=freeGroup.size()-1; g2 > g1; --g2) {
167                         if (freeGroup[g1]->hasSameVars(freeGroup[g2])) {
168                                 freeGroup[g1]->id.insert(freeGroup[g1]->id.end(),
169                                                          freeGroup[g2]->id.begin(), freeGroup[g2]->id.end());
170                                 delete freeGroup[g2];
171                                 freeGroup.erase(freeGroup.begin() + g2);
172                         }
173                 }
174         }
175 }
176
177 /* Initialize and Destroy */
178         void omxInitState(omxState* state) {
179                 state->stale = FALSE;
180                 state->numConstraints = 0;
181                 state->conList = NULL;
182
183                 state->computeCount = 0;
184                 state->currentRow = -1;
185         }
186
187         void omxDuplicateState(omxState* tgt, omxState* src) {
188                 tgt->dataList                   = src->dataList;
189                 
190                 for(size_t mx = 0; mx < src->matrixList.size(); mx++) {
191                         // TODO: Smarter inference for which matrices to duplicate
192                         tgt->matrixList.push_back(omxDuplicateMatrix(src->matrixList[mx], tgt));
193                 }
194
195                 tgt->numConstraints     = src->numConstraints;
196                 tgt->conList                    = (omxConstraint*) R_alloc(tgt->numConstraints, sizeof(omxConstraint));
197                 for(int j = 0; j < tgt->numConstraints; j++) {
198                         tgt->conList[j].size   = src->conList[j].size;
199                         tgt->conList[j].opCode = src->conList[j].opCode;
200                         tgt->conList[j].lbound = src->conList[j].lbound;
201                         tgt->conList[j].ubound = src->conList[j].ubound;
202                         tgt->conList[j].result = omxDuplicateMatrix(src->conList[j].result, tgt);
203                 }
204
205                 for(size_t j = 0; j < src->algebraList.size(); j++) {
206                         // TODO: Smarter inference for which algebras to duplicate
207                         tgt->algebraList.push_back(omxDuplicateMatrix(src->algebraList[j], tgt));
208                 }
209
210                 for(size_t j = 0; j < src->expectationList.size(); j++) {
211                         // TODO: Smarter inference for which expectations to duplicate
212                         tgt->expectationList.push_back(omxDuplicateExpectation(src->expectationList[j], tgt));
213                 }
214
215                 for(size_t j = 0; j < tgt->algebraList.size(); j++) {
216                         omxDuplicateAlgebra(tgt->algebraList[j], src->algebraList[j], tgt);
217                 }
218
219                 for(size_t j = 0; j < src->expectationList.size(); j++) {
220                         // TODO: Smarter inference for which expectations to duplicate
221                         omxCompleteExpectation(tgt->expectationList[j]);
222                 }
223
224                 tgt->computeCount               = src->computeCount;
225                 tgt->currentRow                 = src->currentRow;
226         }
227
228         omxMatrix* omxLookupDuplicateElement(omxState* os, omxMatrix* element) {
229                 if(os == NULL || element == NULL) return NULL;
230
231                 if (element->hasMatrixNumber) {
232                         int matrixNumber = element->matrixNumber;
233                         if (matrixNumber >= 0) {
234                                 return(os->algebraList[matrixNumber]);
235                         } else {
236                                 return(os->matrixList[-matrixNumber - 1]);
237                         }
238                 }
239
240                 omxConstraint* parentConList = globalState->conList;
241
242                 for(int i = 0; i < os->numConstraints; i++) {
243                         if(parentConList[i].result == element) {
244                                 if(os->conList[i].result != NULL) {   // Not sure of proper failure behavior here.
245                 return(os->conList[i].result);
246                                 } else {
247                     omxRaiseError("Initialization Copy Error: Constraint required but not yet processed.");
248             }
249                         }
250                 }
251
252                 return NULL;
253         }
254         
255 void omxFreeChildStates(omxState *state)
256 {
257         if (state->childList.size() == 0) return;
258
259         for(int k = 0; k < Global->numChildren; k++) {
260                 // Data are not modified and not copied. The same memory
261                 // is shared across all instances of state. We only need
262                 // to free the data once, so let the parent do it.
263                 state->childList[k]->dataList.clear();
264
265                 omxFreeState(state->childList[k]);
266         }
267         state->childList.clear();
268         Global->numChildren = 0;
269 }
270
271         void omxFreeState(omxState *state) {
272                 omxFreeChildStates(state);
273
274                 if(OMX_DEBUG) { mxLog("Freeing %d Constraints.", (int) state->numConstraints);}
275                 for(int k = 0; k < state->numConstraints; k++) {
276                         omxFreeMatrix(state->conList[k].result);
277                 }
278
279                 for(size_t ax = 0; ax < state->algebraList.size(); ax++) {
280                         // free argument tree
281                         omxFreeMatrix(state->algebraList[ax]);
282                 }
283
284                 for(size_t ax = 0; ax < state->algebraList.size(); ax++) {
285                         state->algebraList[ax]->hasMatrixNumber = false;
286                         omxFreeMatrix(state->algebraList[ax]);
287                 }
288
289                 if(OMX_DEBUG) { mxLog("Freeing %d Matrices.", (int) state->matrixList.size());}
290                 for(size_t mk = 0; mk < state->matrixList.size(); mk++) {
291                         state->matrixList[mk]->hasMatrixNumber = false;
292                         omxFreeMatrix(state->matrixList[mk]);
293                 }
294                 
295                 if(OMX_DEBUG) { mxLog("Freeing %d Model Expectations.", (int) state->expectationList.size());}
296                 for(size_t ex = 0; ex < state->expectationList.size(); ex++) {
297                         omxFreeExpectationArgs(state->expectationList[ex]);
298                 }
299
300                 if(OMX_DEBUG) { mxLog("Freeing %d Data Sets.", (int) state->dataList.size());}
301                 for(size_t dx = 0; dx < state->dataList.size(); dx++) {
302                         omxFreeData(state->dataList[dx]);
303                 }
304
305                 delete state;
306
307                 if(OMX_DEBUG) { mxLog("State Freed.");}
308         }
309
310 omxGlobal::~omxGlobal()
311 {
312         for (size_t cx=0; cx < computeList.size(); ++cx) {
313                 delete computeList[cx];
314         }
315         for (size_t cx=0; cx < algebraList.size(); ++cx) {
316                 delete algebraList[cx];
317         }
318         for (size_t cx=0; cx < checkpointList.size(); ++cx) {
319                 delete checkpointList[cx];
320         }
321         if (freeGroup.size()) {
322                 std::vector< omxFreeVar* > &vars = freeGroup[0]->vars;  // has all vars
323                 for (size_t vx=0; vx < vars.size(); ++vx) {
324                         delete vars[vx];
325                 }
326         }
327         for (size_t gx=0; gx < freeGroup.size(); ++gx) {
328                 delete freeGroup[gx];
329         }
330 }
331
332 std::string string_vsnprintf(const char *fmt, va_list orig_ap)
333 {
334     int size = 100;
335     std::string str;
336     while (1) {
337         str.resize(size);
338         va_list ap;
339         va_copy(ap, orig_ap);
340         int n = vsnprintf((char *)str.c_str(), size, fmt, ap);
341         va_end(ap);
342         if (n > -1 && n < size) {
343             str.resize(n);
344             return str;
345         }
346         if (n > -1)
347             size = n + 1;
348         else
349             size *= 2;
350     }
351     return str;
352 }
353
354 std::string string_snprintf(const char *fmt, ...)
355 {
356         va_list ap;
357         va_start(ap, fmt);
358         std::string str = string_vsnprintf(fmt, ap);
359         va_end(ap);
360         return str;
361 }
362
363 void mxLogBig(const std::string str)   // thread-safe
364 {
365         ssize_t len = ssize_t(str.size());
366         ssize_t wrote = 0;
367         int maxRetries = 20;
368         ssize_t got;
369 #pragma omp critical(stderp)
370         {
371                 while (--maxRetries > 0) {
372                         got = write(2, str.data() + wrote, len - wrote);
373                         if (got == -EINTR) continue;
374                         if (got <= 0) break;
375                         wrote += got;
376                         if (wrote == len) break;
377                 }
378         }
379         if (got <= 0) Rf_error("mxLogBig failed with errno=%d", got);
380
381 }
382
383 void mxLog(const char* msg, ...)   // thread-safe
384 {
385         const int maxLen = 240;
386         char buf1[maxLen];
387         char buf2[maxLen];
388
389         va_list ap;
390         va_start(ap, msg);
391         vsnprintf(buf1, maxLen, msg, ap);
392         va_end(ap);
393
394         int len = snprintf(buf2, maxLen, "[%d] %s\n", omx_absolute_thread_num(), buf1);
395
396         int maxRetries = 20;
397         ssize_t wrote = 0;
398         ssize_t got;
399 #pragma omp critical(stderp)
400         {
401                 while (--maxRetries > 0) {
402                         got = write(2, buf2 + wrote, len - wrote);
403                         if (got == -EINTR) continue;
404                         if (got <= 0) break;
405                         wrote += got;
406                         if (wrote == len) break;
407                 }
408         }
409         if (got <= 0) Rf_error("mxLog failed with errno=%d", got);
410 }
411
412 void _omxRaiseError()
413 {
414         // keep for debugger breakpoints
415 }
416
417 void omxRaiseErrorf(const char* msg, ...)
418 {
419         va_list ap;
420         va_start(ap, msg);
421         std::string str = string_vsnprintf(msg, ap);
422         va_end(ap);
423         _omxRaiseError();
424
425         if(OMX_DEBUG) {
426                 mxLog("Error raised: %s", str.c_str());
427         }
428
429         bool overflow = false;
430 #pragma omp critical(bads)
431         {
432                 if (Global->bads.size() > 100) {
433                         overflow = true;
434                 } else {
435                         Global->bads.push_back(str);
436                 }
437         }
438
439         // mxLog takes a lock too, so call it outside of critical section
440         if (overflow) mxLog("Too many errors: %s", str.c_str());
441 }
442
443 const char *omxGlobal::getBads()
444 {
445         if (bads.size() == 0) return NULL;
446
447         std::string str;
448         for (size_t mx=0; mx < bads.size(); ++mx) {
449                 if (bads.size() > 1) str += string_snprintf("%d:", (int)mx+1);
450                 str += bads[mx];
451                 if (str.size() > (1<<14)) break;
452                 if (mx < bads.size() - 1) str += "\n";
453         }
454
455         size_t sz = str.size();
456         char *mem = R_alloc(sz+1, 1);  // use R's memory
457         memcpy(mem, str.c_str(), sz);
458         mem[sz] = 0;
459         return mem;
460 }
461
462 void omxRaiseError(const char* msg) { // DEPRECATED
463         omxRaiseErrorf("%s", msg);
464 }
465
466         void omxStateNextRow(omxState *state) {
467                 state->currentRow++;
468         };
469
470         void omxStateNextEvaluation(omxState *state) {
471                 state->currentRow = -1;
472                 state->computeCount++;
473         };
474
475 void omxGlobal::checkpointMessage(FitContext *fc, double *est, const char *fmt, ...)
476 {
477         va_list ap;
478         va_start(ap, fmt);
479         std::string str = string_vsnprintf(fmt, ap);
480         va_end(ap);
481
482         for(size_t i = 0; i < checkpointList.size(); i++) {
483                 checkpointList[i]->message(fc, est, str.c_str());
484         }
485 }
486
487 void omxGlobal::checkpointPrefit(FitContext *fc, double *est, bool force)
488 {
489         for(size_t i = 0; i < checkpointList.size(); i++) {
490                 checkpointList[i]->prefit(fc, est, force);
491         }
492 }
493
494 void omxGlobal::checkpointPostfit(FitContext *fc)
495 {
496         for(size_t i = 0; i < checkpointList.size(); i++) {
497                 checkpointList[i]->postfit(fc);
498         }
499 }
500
501 omxCheckpoint::omxCheckpoint() : wroteHeader(false), lastCheckpoint(0), lastIterations(0), fitPending(false),
502                                  timePerCheckpoint(0), iterPerCheckpoint(0), file(NULL)
503 {}
504
505 omxCheckpoint::~omxCheckpoint()
506 {
507         if (file) fclose(file);
508 }
509
510 /* We need to re-design checkpointing when it is possible to run
511    more than 1 optimization in parallel. */
512 void omxCheckpoint::omxWriteCheckpointHeader()
513 {
514         if (wroteHeader) return;
515         std::vector< omxFreeVar* > &vars = Global->freeGroup[0]->vars;
516         size_t numParam = vars.size();
517
518         fprintf(file, "OpenMxContext\tOpenMxNumFree\titerations\ttimestamp");
519         for(size_t j = 0; j < numParam; j++) {
520                 fprintf(file, "\t\"%s\"", vars[j]->name);
521         }
522         fprintf(file, "\tobjective\n");
523         fflush(file);
524         wroteHeader = true;
525 }
526  
527 void omxCheckpoint::message(FitContext *fc, double *est, const char *msg)
528 {
529         _prefit(fc, est, true, msg);
530         postfit(fc);
531 }
532
533 void omxCheckpoint::_prefit(FitContext *fc, double *est, bool force, const char *context)
534 {
535         const int timeBufSize = 32;
536         char timeBuf[timeBufSize];
537         time_t now = time(NULL); // avoid checking unless we need it
538
539         bool doit = force;
540         if ((timePerCheckpoint && timePerCheckpoint <= now - lastCheckpoint) ||
541             (iterPerCheckpoint && iterPerCheckpoint <= fc->iterations - lastIterations)) {
542                 doit = true;
543         }
544         if (!doit) return;
545
546         omxWriteCheckpointHeader();
547
548         std::vector< omxFreeVar* > &vars = fc->varGroup->vars;
549         struct tm *nowTime = localtime(&now);
550         strftime(timeBuf, timeBufSize, "%b %d %Y %I:%M:%S %p", nowTime);
551         fprintf(file, "%s\t%d\t%d\t%s", context, int(vars.size()), lastIterations, timeBuf);
552
553         size_t lx=0;
554         size_t numParam = Global->freeGroup[0]->vars.size();
555         for (size_t px=0; px < numParam; ++px) {
556                 if (lx < vars.size() && vars[lx]->id == (int)px) {
557                         fprintf(file, "\t%.10g", est[lx]);
558                         ++lx;
559                 } else {
560                         fprintf(file, "\tNA");
561                 }
562         }
563         fflush(file);
564         fitPending = true;
565         lastCheckpoint = now;
566         lastIterations = fc->iterations;
567 }
568
569 void omxCheckpoint::prefit(FitContext *fc, double *est, bool force)
570 {
571         _prefit(fc, est, force, "opt");
572 }
573
574 void omxCheckpoint::postfit(FitContext *fc)
575 {
576         if (!fitPending) return;
577         fprintf(file, "\t%.10g\n", fc->fit);
578         fflush(file);
579         fitPending = false;
580 }
581
582 omxFreeVarLocation *omxFreeVar::getLocation(int matrix)
583 {
584         for (size_t lx=0; lx < locations.size(); lx++) {
585                 omxFreeVarLocation *loc = &locations[lx];
586                 if (~loc->matrix == matrix) return loc;
587         }
588         return NULL;
589 }