WorkLog pruner thread, to clean up old work even when blocks aren't being found
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         t = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 t.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         t.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         t.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return t
77
78
79 from util import Bits2Target
80
81 workLog = {}
82 networkTarget = None
83 DupeShareHACK = {}
84
85 server = None
86 def updateBlocks():
87         if server:
88                 server.wakeLongpoll()
89
90 def blockChanged():
91         global DupeShareHACK
92         DupeShareHACK = {}
93         global MM, networkTarget, server
94         networkTarget = Bits2Target(MM.currentBlock[1])
95         workLog.clear()
96         updateBlocks()
97
98
99 from time import sleep, time
100 import traceback
101
102 def _WorkLogPruner_I(wl):
103         now = time()
104         pruned = 0
105         for username in wl:
106                 userwork = wl[username]
107                 for wli in tuple(userwork.keys()):
108                         if now > userwork[wli][1] + 120:
109                                 del userwork[wli]
110                                 pruned += 1
111         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
112
113 def WorkLogPruner(wl):
114         while True:
115                 try:
116                         sleep(60)
117                         _WorkLogPruner_I(wl)
118                 except:
119                         WorkLogPruner.logger.error(traceback.format_exc())
120 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
121
122
123 from merklemaker import merkleMaker
124 MM = merkleMaker()
125 MM.__dict__.update(config.__dict__)
126 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
127 MM.clearCoinbaseTxn.assemble()
128 MM.makeCoinbaseTxn = makeCoinbaseTxn
129 MM.onBlockChange = blockChanged
130 MM.onBlockUpdate = updateBlocks
131 MM.start()
132
133
134 from binascii import b2a_hex
135 from copy import deepcopy
136 from struct import pack, unpack
137 from time import time
138 from util import RejectedShare, dblsha, hash2int
139 import jsonrpc
140 import threading
141 import traceback
142
143 gotwork = None
144 if hasattr(config, 'GotWorkURI'):
145         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
146
147 def submitGotwork(info):
148         try:
149                 gotwork.gotwork(info)
150         except:
151                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
152
153 db = None
154 if hasattr(config, 'DbOptions'):
155         import psycopg2
156         db = psycopg2.connect(**config.DbOptions)
157
158 def getBlockHeader(username):
159         MRD = MM.getMRD()
160         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
161         timestamp = pack('<L', int(time()))
162         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
163         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
164         return hdr
165
166 def YN(b):
167         if b is None:
168                 return None
169         return 'Y' if b else 'N'
170
171 def logShare(share):
172         if db is None:
173                 return
174         dbc = db.cursor()
175         rem_host = share.get('remoteHost', '?')
176         username = share['username']
177         reason = share.get('rejectReason', None)
178         upstreamResult = share.get('upstreamResult', None)
179         solution = share['_origdata']
180         #solution = b2a_hex(solution).decode('utf8')
181         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
182         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
183         dbc.execute(stmt, params)
184         db.commit()
185
186 RBDs = []
187 RBPs = []
188
189 from bitcoin.varlen import varlenEncode
190 def assembleBlock(blkhdr, txlist):
191         payload = blkhdr
192         payload += varlenEncode(len(txlist))
193         for tx in txlist:
194                 payload += tx.data
195         return payload
196
197 def blockSubmissionThread(payload):
198         while True:
199                 try:
200                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
201                         break
202                 except:
203                         pass
204
205 def checkShare(share):
206         data = share['data']
207         data = data[:80]
208         (prevBlock, bits) = MM.currentBlock
209         sharePrevBlock = data[4:36]
210         if sharePrevBlock != prevBlock:
211                 if sharePrevBlock == MM.lastBlock[0]:
212                         raise RejectedShare('stale-prevblk')
213                 raise RejectedShare('bad-prevblk')
214         
215         shareMerkleRoot = data[36:68]
216         # TODO: use userid
217         username = share['username']
218         if username not in workLog:
219                 raise RejectedShare('unknown-user')
220         
221         if data[72:76] != bits:
222                 raise RejectedShare('bad-diffbits')
223         if data[:4] != b'\1\0\0\0':
224                 raise RejectedShare('bad-version')
225         
226         MWL = workLog[username]
227         if shareMerkleRoot not in MWL:
228                 raise RejectedShare('unknown-work')
229         (MRD, t) = MWL[shareMerkleRoot]
230         share['MRD'] = MRD
231         
232         if data in DupeShareHACK:
233                 raise RejectedShare('duplicate')
234         DupeShareHACK[data] = None
235         
236         shareTimestamp = unpack('<L', data[68:72])[0]
237         shareTime = share['time'] = time()
238         if shareTime < t - 120:
239                 raise RejectedShare('stale-work')
240         if shareTimestamp < shareTime - 300:
241                 raise RejectedShare('time-too-old')
242         if shareTimestamp > shareTime + 7200:
243                 raise RejectedShare('time-too-new')
244         
245         blkhash = dblsha(data)
246         if blkhash[28:] != b'\0\0\0\0':
247                 raise RejectedShare('H-not-zero')
248         blkhashn = hash2int(blkhash)
249         
250         global networkTarget
251         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
252         logfunc('BLKHASH: %64x' % (blkhashn,))
253         logfunc(' TARGET: %64x' % (networkTarget,))
254         
255         txlist = MRD[1].data
256         t = txlist[0]
257         t.setCoinbase(MRD[2])
258         t.assemble()
259         
260         if blkhashn <= networkTarget:
261                 logfunc("Submitting upstream")
262                 RBDs.append( deepcopy( (data, txlist) ) )
263                 payload = assembleBlock(data, txlist)
264                 logfunc('Real block payload: %s' % (payload,))
265                 RBPs.append(payload)
266                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
267                 bcnode.submitBlock(payload)
268                 share['upstreamResult'] = True
269                 MM.updateBlock(blkhash)
270         
271         # Gotwork hack...
272         if gotwork and blkhashn <= config.GotWorkTarget:
273                 try:
274                         coinbaseMrkl = t.data
275                         coinbaseMrkl += blkhash
276                         steps = MRD[1]._steps
277                         coinbaseMrkl += pack('B', len(steps))
278                         for step in steps:
279                                 coinbaseMrkl += step
280                         coinbaseMrkl += b"\0\0\0\0"
281                         info = {}
282                         info['hash'] = b2a_hex(blkhash).decode('ascii')
283                         info['header'] = b2a_hex(data).decode('ascii')
284                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
285                         thr = threading.Thread(target=submitGotwork, args=(info,))
286                         thr.daemon = True
287                         thr.start()
288                 except:
289                         checkShare.logger.warning('Failed to build gotwork request')
290         
291         logShare(share)
292 checkShare.logger = logging.getLogger('checkShare')
293
294 def receiveShare(share):
295         # TODO: username => userid
296         try:
297                 checkShare(share)
298         except RejectedShare as rej:
299                 share['rejectReason'] = str(rej)
300                 logShare(share)
301                 raise
302         # TODO
303
304 def newBlockNotification(signum, frame):
305         logging.getLogger('newBlockNotification').info('Received new block notification')
306         MM.updateMerkleTree()
307         # TODO: Force RESPOND TO LONGPOLLS?
308         pass
309
310 from signal import signal, SIGUSR1
311 signal(SIGUSR1, newBlockNotification)
312
313
314 import os
315 import os.path
316 import pickle
317 import signal
318 import sys
319 from time import sleep
320 import traceback
321
322 SAVE_STATE_FILENAME = 'eloipool.worklog'
323
324 def stopServers():
325         logger = logging.getLogger('stopServers')
326         
327         logger.info('Stopping servers...')
328         global bcnode, server
329         servers = (bcnode, server)
330         for s in servers:
331                 s.keepgoing = False
332         for s in servers:
333                 s.wakeup()
334         i = 0
335         while True:
336                 sl = []
337                 for s in servers:
338                         if s.running:
339                                 sl.append(s.__class__.__name__)
340                 if not sl:
341                         break
342                 i += 1
343                 if i >= 0x100:
344                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
345                         break
346                 sleep(0.01)
347         
348         for s in servers:
349                 for fd in s._fd.keys():
350                         os.close(fd)
351
352 def saveState():
353         logger = logging.getLogger('saveState')
354         
355         # Then, save data needed to resume work
356         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
357         i = 0
358         while True:
359                 try:
360                         with open(SAVE_STATE_FILENAME, 'wb') as f:
361                                 pickle.dump( (workLog, DupeShareHACK), f )
362                         break
363                 except:
364                         i += 1
365                         if i >= 0x10000:
366                                 logger.error('Failed to save work\n' + traceback.format_exc())
367                                 try:
368                                         os.unlink(SAVE_STATE_FILENAME)
369                                 except:
370                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
371
372 def exit():
373         stopServers()
374         saveState()
375         logging.getLogger('exit').info('Goodbye...')
376         os.kill(os.getpid(), signal.SIGTERM)
377         sys.exit(0)
378
379 def restart():
380         stopServers()
381         saveState()
382         logging.getLogger('restart').info('Restarting...')
383         try:
384                 os.execv(sys.argv[0], sys.argv)
385         except:
386                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
387
388 def restoreState():
389         if not os.path.exists(SAVE_STATE_FILENAME):
390                 return
391         
392         global workLog, DupeShareHACK
393         
394         logger = logging.getLogger('restoreState')
395         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, os.stat(SAVE_STATE_FILENAME).st_size))
396         try:
397                 with open(SAVE_STATE_FILENAME, 'rb') as f:
398                         data = pickle.load(f)
399                         workLog = data[0]
400                         DupeShareHACK = data[1]
401         except:
402                 logger.error('Failed to restore state\n' + traceback.format_exc())
403                 return
404         logger.info('State restored successfully')
405
406
407 from jsonrpcserver import JSONRPCListener, JSONRPCServer
408 import interactivemode
409 from networkserver import NetworkListener
410 import threading
411
412 if __name__ == "__main__":
413         LSbc = []
414         if not hasattr(config, 'BitcoinNodeAddresses'):
415                 config.BitcoinNodeAddresses = ()
416         for a in config.BitcoinNodeAddresses:
417                 LSbc.append(NetworkListener(bcnode, a))
418         
419         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
420                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
421         
422         server = JSONRPCServer()
423         if hasattr(config, 'JSONRPCAddress'):
424                 if not hasattr(config, 'JSONRPCAddresses'):
425                         config.JSONRPCAddresses = []
426                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
427         LS = []
428         for a in config.JSONRPCAddresses:
429                 LS.append(JSONRPCListener(server, a))
430         if hasattr(config, 'SecretUser'):
431                 server.SecretUser = config.SecretUser
432         server.aux = MM.CoinbaseAux
433         server.getBlockHeader = getBlockHeader
434         server.receiveShare = receiveShare
435         server.RaiseRedFlags = RaiseRedFlags
436         
437         restoreState()
438         
439         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
440         prune_thr.daemon = True
441         prune_thr.start()
442         
443         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
444         bcnode_thr.daemon = True
445         bcnode_thr.start()
446         
447         server.serve_forever()