Merge commit '02c7323' into stratum_sidmanager
[bitcoin:eloipool.git] / util.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from hashlib import sha256
18 from math import log
19 import re
20 import string
21 from struct import unpack
22 import traceback
23
24 def YN(b):
25         if b is None:
26                 return None
27         return 'Y' if b else 'N'
28
29 def _maybe_int(n):
30         n_int = int(n)
31         if n == n_int:
32                 return n_int
33         return n
34
35 def target2pdiff(target):
36         if target is None:
37                 return None
38         pdiff = round(2**(224 - log(target, 2)), 8)
39         return _maybe_int(pdiff)
40
41 bdiff1target = 0x00000000FFFF0000000000000000000000000000000000000000000000000000
42
43 def target2bdiff(target):
44         bdiff = bdiff1target / target
45         return _maybe_int(bdiff)
46
47 class shareLogFormatter:
48         _re_x = re.compile(r'^\s*(\w+)\s*(?:\(\s*(.*?)\s*\))?\s*$')
49         
50         def __init__(self, *a, **ka):
51                 self._p = self.parse(*a, **ka)
52         
53         # NOTE: This only works for psf='%s' (default)
54         def formatShare(self, *a, **ka):
55                 (stmt, params) = self.applyToShare(*a, **ka)
56                 return stmt % params
57         
58         def applyToShare(self, share):
59                 (stmt, stmtf) = self._p
60                 params = []
61                 for f in stmtf:
62                         params.append(f(share))
63                 params = tuple(params)
64                 return (stmt, params)
65         
66         @classmethod
67         def parse(self, stmt, psf = '%s'):
68                 fmt = string.Formatter()
69                 pstmt = tuple(fmt.parse(stmt))
70                 
71                 stmt = ''
72                 fmt = []
73                 for (lit, field, fmtspec, conv) in pstmt:
74                         stmt += lit
75                         if not field:
76                                 continue
77                         f = self.get_field(field)
78                         fmt.append(f)
79                         stmt += psf
80                 fmt = tuple(fmt)
81                 return (stmt, fmt)
82         
83         @classmethod
84         def get_field(self, field):
85                 m = self._re_x.match(field)
86                 if m:
87                         if m.group(2) is None:
88                                 # identifier
89                                 return lambda s: s.get(field, None)
90                         else:
91                                 # function
92                                 fn = m.group(1)
93                                 sf = self.get_field(m.group(2))
94                                 gfm = 'get_field_%s' % (fn,)
95                                 if hasattr(self, gfm):
96                                         return getattr(self, gfm)(sf)
97                                 f = eval(fn)
98                                 return self._get_field_auto(f, sf)
99                 raise ValueError('Failed to parse field: %s' % (field,))
100         
101         @classmethod
102         def _get_field_auto(self, f, subfunc):
103                 return lambda s: f(subfunc(s))
104         
105         @classmethod
106         def get_field_not(self, subfunc):
107                 return lambda s: not subfunc(s)
108         
109         @classmethod
110         def get_field_Q(self, subfunc):
111                 return lambda s: subfunc(s) or '?'
112         
113         @classmethod
114         def get_field_dash(self, subfunc):
115                 return lambda s: subfunc(s) or '-'
116
117 def dblsha(b):
118         return sha256(sha256(b).digest()).digest()
119
120 def swap32(b):
121         o = b''
122         for i in range(0, len(b), 4):
123                 o += b[i + 3:i - 1 if i else None:-1]
124         return o
125
126 def Bits2Target(bits):
127         return unpack('<L', bits[:3] + b'\0')[0] * 2**(8*(bits[3] - 3))
128
129 def LEhash2int(h):
130         n = unpack('<QQQQ', h)
131         n = (n[3] << 192) | (n[2] << 128) | (n[1] << 64) | n[0]
132         return n
133
134 def BEhash2int(h):
135         n = unpack('>QQQQ', h)
136         n = (n[0] << 192) | (n[1] << 128) | (n[2] << 64) | n[3]
137         return n
138
139 def tryErr(func, *a, **kw):
140         IE = kw.pop('IgnoredExceptions', BaseException)
141         logger = kw.pop('Logger', None)
142         emsg = kw.pop('ErrorMsg', None)
143         try:
144                 return func(*a, **kw)
145         except IE:
146                 if logger:
147                         emsg = "%s\n" % (emsg,) if emsg else ""
148                         emsg += traceback.format_exc()
149                         logger.error(emsg)
150                 return None
151
152 class RejectedShare(ValueError):
153         pass
154
155 PendingUpstream = object()
156
157
158 import heapq
159
160 class ScheduleDict:
161         def __init__(self):
162                 self._dict = {}
163                 self._build_heap()
164         
165         def _build_heap(self):
166                 newheap = list((v[0], id(o), o) for o, v in self._dict.items())
167                 heapq.heapify(newheap)
168                 self._heap = newheap
169         
170         def nextTime(self):
171                 while True:
172                         (t, k, o) = self._heap[0]
173                         if o in self._dict:
174                                 break
175                         heapq.heappop(self._heap)
176                 return t
177         
178         def shift(self):
179                 while True:
180                         (t, k, o) = heapq.heappop(self._heap)
181                         if o in self._dict:
182                                 break
183                 del self._dict[o]
184                 return o
185         
186         def __setitem__(self, o, t):
187                 k = id(o)
188                 self._dict[o] = (t, o)
189                 if len(self._heap) / 2 > len(self._dict):
190                         self._build_heap()
191                 else:
192                         heapq.heappush(self._heap, (t, k, o))
193         
194         def __contains__(self, o):
195                 return o in self._dict
196         
197         def __getitem__(self, o):
198                 return self._dict[o][0]
199         
200         def __delitem__(self, o):
201                 del self._dict[o]
202                 if len(self._dict) < 2:
203                         self._build_heap()
204         
205         def __len__(self):
206                 return len(self._dict)
207
208 class WithNoop:
209         def __enter__(self):
210                 pass
211         def __exit__(self, *a):
212                 pass
213 WithNoop = WithNoop()
214
215
216 from collections import deque
217 import threading
218 import time
219
220 class _UniqueSessionIdManager:
221         def __init__(self, size = 4, defaultDelay = 120):
222                 self._NextID = 0
223                 self._NextID_Lock = threading.Lock()
224                 self._FreeIDs = deque()
225                 self._size = size
226                 self._max = (0x100 ** size) - 1
227                 self._defaultDelay = defaultDelay
228                 self._schPut = ScheduleDict()
229                 self._schPut_Lock = threading.Lock()
230         
231         def size(self):
232                 return self._size
233         
234         def put(self, sid, delay = False, now = None):
235                 if not delay:
236                         return self._FreeIDs.append(sid)
237                 
238                 if delay is True:
239                         delay = self._defaultDelay
240                 if now is None:
241                         now = time.time()
242                 with self._schPut_Lock:
243                         self._schPut[sid] = now + delay
244         
245         def get(self, desired = None, now = None):
246                 try:
247                         return self._FreeIDs.popleft()
248                 except IndexError:
249                         pass
250                 
251                 # Check delayed-free for one
252                 if now is None:
253                         now = time.time()
254                 with self._schPut_Lock:
255                         if len(self._schPut) and self._schPut.nextTime() <= now:
256                                 sid = self._schPut.shift()
257                                 while len(self._schPut) and self._schPut.nextTime() <= now:
258                                         self.put(self._schPut.shift())
259                                 return sid
260                 
261                 # If none free, make a new one
262                 with self._NextID_Lock:
263                         sid = self._NextID
264                         self._NextID = sid + 1
265                 if sid <= self._max:
266                         return sid
267                 
268                 # TODO: Maybe steal an about-to-be-freed SID in the worst case scenario?
269                 
270                 raise IndexError('Ran out of session ids')
271         
272         # NOTE: Will steal a pending-free sid
273         def getSpecific(self, desired):
274                 try:
275                         self._FreeIDs.remove(desired)
276                         return desired
277                 except ValueError:
278                         pass
279                 
280                 # FIXME: relies on id(number) == id(number)
281                 with self._schPut_Lock:
282                         if desired in self._schPut:
283                                 del self._schPut[desired]
284                                 return desired
285                 
286                 # NOTE: Generated growth is limited to avoid memory exhaustion exploits
287                 with self._NextID_Lock:
288                         NextID = self._NextID
289                         if desired >= NextID and desired <= min(self._max, NextID + 0x10000 - len(self._FreeIDs)):
290                                 # NOTE: Incrementing _NextID up front in case of exception
291                                 self._NextID = desired + 1
292                                 for i in range(NextID, desired):
293                                         self.put(i)
294                                 return desired
295                 
296                 raise KeyError('Session id %u not available' % (desired,))
297
298 UniqueSessionIdManager = _UniqueSessionIdManager()