Log SQL INSERT errors in detail
[bitcoin:eloipool.git] / sharelogging / sql.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 import logging
18 from queue import Queue
19 import threading
20 import traceback
21 from util import shareLogFormatter
22
23 _logger = logging.getLogger('sharelogging.sql')
24
25 class sql:
26         _psf = {
27                 'qmark': '?',
28                 'format': '%s',
29                 'pyformat': '%s',
30         }
31         
32         def __init__(self, **ka):
33                 self.opts = ka
34                 dbe = ka['engine']
35                 if 'statement' not in ka:
36                         _logger.warn('"statement" not specified for sql logger, but default may vary!')
37                 self.exceptions = []
38                 self.threadsafe = False
39                 getattr(self, 'setup_%s' % (dbe,))()
40                 if self.threadsafe:
41                         self._logShareF = self._doInsert
42                         self.stop = self._shutdown
43                         self._connect()
44                 else:
45                         self._queue = Queue()
46                         self._logShareF = self._queue.put
47                         threading.Thread(target=self._thread).start()
48         
49         def _doInsert(self, o):
50                 (stmt, params) = o
51                 dbc = self.db.cursor()
52                 try:
53                         dbc.execute(stmt, params)
54                 except BaseException as e:
55                         _logger.critical('Error inserting data: %s%s' % ((stmt, params), traceback.format_exc()))
56                         self.exceptions.append((stmt, params, e))
57                         return
58                 self.db.commit()
59         
60         def _thread(self):
61                 self._connect()
62                 while True:
63                         try:
64                                 o = self._queue.get()
65                                 if o is None:
66                                         # Shutdown logger
67                                         break
68                                 self._doInsert(o)
69                         except:
70                                 _logger.critical(traceback.format_exc())
71                 self._shutdown()
72         
73         def setup_mysql(self):
74                 import pymysql
75                 dbopts = self.opts.get('dbopts', {})
76                 if 'passwd' not in dbopts and 'password' in dbopts:
77                         dbopts['passwd'] = dbopts['password']
78                         del dbopts['password']
79                 self.modsetup(pymysql)
80         
81         def setup_postgres(self):
82                 import psycopg2
83                 self.opts.setdefault('statement', "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))")
84                 self.modsetup(psycopg2)
85         
86         def setup_sqlite(self):
87                 import sqlite3
88                 self.modsetup(sqlite3)
89         
90         def modsetup(self, mod):
91                 self._mod = mod
92                 psf = self._psf[mod.paramstyle]
93                 self.opts.setdefault('statement', "insert into shares (remoteHost, username, rejectReason, upstreamResult, solution) values ({remoteHost}, {username}, {rejectReason}, {upstreamResult}, {solution})")
94                 stmt = self.opts['statement']
95                 self.pstmt = shareLogFormatter(stmt, psf)
96         
97         def _connect(self):
98                 self.db = self._mod.connect(**self.opts.get('dbopts', {}))
99         
100         def logShare(self, share):
101                 o = self.pstmt.applyToShare(share)
102                 self._logShareF(o)
103         
104         def stop(self):
105                 # NOTE: this is replaced with _shutdown directly for threadsafe objects
106                 self._queue.put(None)
107         
108         def _shutdown(self):
109                 pass # TODO