Bugfix: Import traceback for SQL sharelogger
[bitcoin:eloipool.git] / sharelogging / sql.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 import logging
18 from queue import Queue
19 import threading
20 import traceback
21 from util import shareLogFormatter
22
23 _logger = logging.getLogger('sharelogging.sql')
24
25 class sql:
26         _psf = {
27                 'qmark': '?',
28                 'format': '%s',
29                 'pyformat': '%s',
30         }
31         
32         def __init__(self, **ka):
33                 self.opts = ka
34                 dbe = ka['engine']
35                 if 'statement' not in ka:
36                         _logger.warn('"statement" not specified for sql logger, but default may vary!')
37                 self.threadsafe = False
38                 getattr(self, 'setup_%s' % (dbe,))()
39                 if self.threadsafe:
40                         self._logShareF = self._doInsert
41                         self.stop = self._shutdown
42                         self._connect()
43                 else:
44                         self._queue = Queue()
45                         self._logShareF = self._queue.put
46                         threading.Thread(target=self._thread).start()
47         
48         def _doInsert(self, o):
49                 (stmt, params) = o
50                 dbc = self.db.cursor()
51                 dbc.execute(stmt, params)
52                 self.db.commit()
53         
54         def _thread(self):
55                 self._connect()
56                 while True:
57                         try:
58                                 o = self._queue.get()
59                                 if o is None:
60                                         # Shutdown logger
61                                         break
62                                 self._doInsert(o)
63                         except:
64                                 _logger.critical(traceback.format_exc())
65                 self._shutdown()
66         
67         def setup_mysql(self):
68                 import pymysql
69                 dbopts = self.opts.get('dbopts', {})
70                 if 'passwd' not in dbopts and 'password' in dbopts:
71                         dbopts['passwd'] = dbopts['password']
72                         del dbopts['password']
73                 self.modsetup(pymysql)
74         
75         def setup_postgres(self):
76                 import psycopg2
77                 self.opts.setdefault('statement', "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))")
78                 self.modsetup(psycopg2)
79         
80         def setup_sqlite(self):
81                 import sqlite3
82                 self.modsetup(sqlite3)
83         
84         def modsetup(self, mod):
85                 self._mod = mod
86                 psf = self._psf[mod.paramstyle]
87                 self.opts.setdefault('statement', "insert into shares (remoteHost, username, rejectReason, upstreamResult, solution) values ({remoteHost}, {username}, {rejectReason}, {upstreamResult}, {solution})")
88                 stmt = self.opts['statement']
89                 self.pstmt = shareLogFormatter(stmt, psf)
90         
91         def _connect(self):
92                 self.db = self._mod.connect(**self.opts.get('dbopts', {}))
93         
94         def logShare(self, share):
95                 o = self.pstmt.applyToShare(share)
96                 self._logShareF(o)
97         
98         def stop(self):
99                 # NOTE: this is replaced with _shutdown directly for threadsafe objects
100                 self._queue.put(None)
101         
102         def _shutdown(self):
103                 pass # TODO