UniqueSessionIdManager: Support for delaying releases of session ids, and picking...
[bitcoin:eloipool.git] / util.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from hashlib import sha256
18 from math import log
19 import re
20 import string
21 from struct import unpack
22 import traceback
23
24 def YN(b):
25         if b is None:
26                 return None
27         return 'Y' if b else 'N'
28
29 def _maybe_int(n):
30         n_int = int(n)
31         if n == n_int:
32                 return n_int
33         return n
34
35 def target2pdiff(target):
36         if target is None:
37                 return None
38         pdiff = round(2**(224 - log(target, 2)), 8)
39         return _maybe_int(pdiff)
40
41 bdiff1target = 0x00000000FFFF0000000000000000000000000000000000000000000000000000
42
43 def target2bdiff(target):
44         bdiff = bdiff1target / target
45         return _maybe_int(bdiff)
46
47 class shareLogFormatter:
48         _re_x = re.compile(r'^\s*(\w+)\s*(?:\(\s*(.*?)\s*\))?\s*$')
49         
50         def __init__(self, *a, **ka):
51                 self._p = self.parse(*a, **ka)
52         
53         # NOTE: This only works for psf='%s' (default)
54         def formatShare(self, *a, **ka):
55                 (stmt, params) = self.applyToShare(*a, **ka)
56                 return stmt % params
57         
58         def applyToShare(self, share):
59                 (stmt, stmtf) = self._p
60                 params = []
61                 for f in stmtf:
62                         params.append(f(share))
63                 params = tuple(params)
64                 return (stmt, params)
65         
66         @classmethod
67         def parse(self, stmt, psf = '%s'):
68                 fmt = string.Formatter()
69                 pstmt = tuple(fmt.parse(stmt))
70                 
71                 stmt = ''
72                 fmt = []
73                 for (lit, field, fmtspec, conv) in pstmt:
74                         stmt += lit
75                         if not field:
76                                 continue
77                         f = self.get_field(field)
78                         fmt.append(f)
79                         stmt += psf
80                 fmt = tuple(fmt)
81                 return (stmt, fmt)
82         
83         @classmethod
84         def get_field(self, field):
85                 m = self._re_x.match(field)
86                 if m:
87                         if m.group(2) is None:
88                                 # identifier
89                                 return lambda s: s.get(field, None)
90                         else:
91                                 # function
92                                 fn = m.group(1)
93                                 sf = self.get_field(m.group(2))
94                                 return getattr(self, 'get_field_%s' % (fn,))(sf)
95                 raise ValueError('Failed to parse field: %s' % (field,))
96         
97         @classmethod
98         def get_field_not(self, subfunc):
99                 return lambda s: not subfunc(s)
100         
101         @classmethod
102         def get_field_Q(self, subfunc):
103                 return lambda s: subfunc(s) or '?'
104         
105         @classmethod
106         def get_field_dash(self, subfunc):
107                 return lambda s: subfunc(s) or '-'
108         
109         @classmethod
110         def get_field_YN(self, subfunc):
111                 return lambda s: YN(subfunc(s))
112         
113         @classmethod
114         def get_field_target2bdiff(self, subfunc):
115                 return lambda s: target2bdiff(subfunc(s))
116         
117         @classmethod
118         def get_field_target2pdiff(self, subfunc):
119                 return lambda s: target2pdiff(subfunc(s))
120
121 def dblsha(b):
122         return sha256(sha256(b).digest()).digest()
123
124 def swap32(b):
125         o = b''
126         for i in range(0, len(b), 4):
127                 o += b[i + 3:i - 1 if i else None:-1]
128         return o
129
130 def Bits2Target(bits):
131         return unpack('<L', bits[:3] + b'\0')[0] * 2**(8*(bits[3] - 3))
132
133 def LEhash2int(h):
134         n = unpack('<QQQQ', h)
135         n = (n[3] << 192) | (n[2] << 128) | (n[1] << 64) | n[0]
136         return n
137
138 def BEhash2int(h):
139         n = unpack('>QQQQ', h)
140         n = (n[0] << 192) | (n[1] << 128) | (n[2] << 64) | n[3]
141         return n
142
143 def tryErr(func, *a, **kw):
144         IE = kw.pop('IgnoredExceptions', BaseException)
145         logger = kw.pop('Logger', None)
146         emsg = kw.pop('ErrorMsg', None)
147         try:
148                 return func(*a, **kw)
149         except IE:
150                 if logger:
151                         emsg = "%s\n" % (emsg,) if emsg else ""
152                         emsg += traceback.format_exc()
153                         logger.error(emsg)
154                 return None
155
156 class RejectedShare(ValueError):
157         pass
158
159 PendingUpstream = object()
160
161
162 import heapq
163
164 class ScheduleDict:
165         def __init__(self):
166                 self._dict = {}
167                 self._build_heap()
168         
169         def _build_heap(self):
170                 newheap = list((v[0], k, v[1]) for k, v in self._dict.items())
171                 heapq.heapify(newheap)
172                 self._heap = newheap
173         
174         def nextTime(self):
175                 while True:
176                         (t, k, o) = self._heap[0]
177                         if k in self._dict:
178                                 break
179                         heapq.heappop(self._heap)
180                 return t
181         
182         def shift(self):
183                 while True:
184                         (t, k, o) = heapq.heappop(self._heap)
185                         if k in self._dict:
186                                 break
187                 del self._dict[k]
188                 return o
189         
190         def __setitem__(self, o, t):
191                 k = o
192                 self._dict[k] = (t, o)
193                 if len(self._heap) / 2 > len(self._dict):
194                         self._build_heap()
195                 else:
196                         heapq.heappush(self._heap, (t, k, o))
197         
198         def __contains__(self, o):
199                 return o in self._dict
200         
201         def __getitem__(self, o):
202                 return self._dict[o][0]
203         
204         def __delitem__(self, o):
205                 del self._dict[o]
206                 if len(self._dict) < 2:
207                         self._build_heap()
208         
209         def __len__(self):
210                 return len(self._dict)
211
212 class WithNoop:
213         def __enter__(self):
214                 pass
215         def __exit__(self, *a):
216                 pass
217 WithNoop = WithNoop()
218
219
220 from collections import deque
221 import threading
222 import time
223
224 class _UniqueSessionIdManager:
225         def __init__(self, size = 4, defaultDelay = 120):
226                 self._NextID = 0
227                 self._NextID_Lock = threading.Lock()
228                 self._FreeIDs = deque()
229                 self._size = size
230                 self._max = (0x100 ** size) - 1
231                 self._defaultDelay = defaultDelay
232                 self._schPut = ScheduleDict()
233                 self._schPut_Lock = threading.Lock()
234         
235         def size(self):
236                 return self._size
237         
238         def put(self, sid, delay = False, now = None):
239                 if not delay:
240                         return self._FreeIDs.append(sid)
241                 
242                 if delay is True:
243                         delay = self._defaultDelay
244                 if now is None:
245                         now = time.time()
246                 with self._schPut_Lock:
247                         self._schPut[sid] = now + delay
248         
249         def get(self, desired = None, now = None):
250                 try:
251                         return self._FreeIDs.popleft()
252                 except IndexError:
253                         pass
254                 
255                 # Check delayed-free for one
256                 if now is None:
257                         now = time.time()
258                 with self._schPut_Lock:
259                         if len(self._schPut) and self._schPut.nextTime() <= now:
260                                 sid = self._schPut.shift()
261                                 while len(self._schPut) and self._schPut.nextTime() <= now:
262                                         self.put(self._schPut.shift())
263                                 return sid
264                 
265                 # If none free, make a new one
266                 with self._NextID_Lock:
267                         sid = self._NextID
268                         self._NextID = sid + 1
269                 if sid <= self._max:
270                         return sid
271                 
272                 # TODO: Maybe steal an about-to-be-freed SID in the worst case scenario?
273                 
274                 raise IndexError('Ran out of session ids')
275         
276         # NOTE: Will steal a pending-free sid
277         def getSpecific(self, desired):
278                 try:
279                         self._FreeIDs.remove(desired)
280                         return desired
281                 except ValueError:
282                         pass
283                 
284                 # FIXME: relies on id(number) == id(number)
285                 with self._schPut_Lock:
286                         if desired in self._schPut:
287                                 del self._schPut[desired]
288                                 return desired
289                 
290                 # NOTE: Generated growth is limited to avoid memory exhaustion exploits
291                 with self._NextID_Lock:
292                         NextID = self._NextID
293                         if desired >= NextID and desired <= min(self._max, NextID + 0x10000 - len(self._FreeIDs)):
294                                 # NOTE: Incrementing _NextID up front in case of exception
295                                 self._NextID = desired + 1
296                                 for i in range(NextID, desired):
297                                         self.put(i)
298                                 return desired
299                 
300                 raise KeyError('Session id %u not available' % (desired,))
301
302 UniqueSessionIdManager = _UniqueSessionIdManager()