Merge branch 'bugfix_thr_sigusr1'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23
24 import logging
25
26 if len(logging.root.handlers) == 0:
27         logging.basicConfig(
28                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
29                 level=logging.DEBUG,
30         )
31         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
32                 logging.getLogger(infoOnly).setLevel(logging.INFO)
33
34 def RaiseRedFlags(reason):
35         logging.getLogger('redflag').critical(reason)
36         return reason
37
38
39 from bitcoin.node import BitcoinLink, BitcoinNode
40 bcnode = BitcoinNode(config.UpstreamNetworkId)
41 bcnode.userAgent += b'Eloipool:0.1/'
42
43 import jsonrpc
44 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
45
46
47 from bitcoin.script import BitcoinScript
48 from bitcoin.txn import Txn
49 from base58 import b58decode
50 from struct import pack
51 import subprocess
52 from time import time
53
54 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
55         txn = Txn.new()
56         
57         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
58                 coinbased = 0
59                 try:
60                         cmd = config.CoinbaserCmd
61                         cmd = cmd.replace('%d', str(coinbaseValue))
62                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
63                         nout = int(p.stdout.readline())
64                         for i in range(nout):
65                                 amount = int(p.stdout.readline())
66                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
67                                 pkScript = BitcoinScript.toAddress(addr)
68                                 txn.addOutput(amount, pkScript)
69                                 coinbased += amount
70                 except:
71                         coinbased = coinbaseValue + 1
72                 if coinbased >= coinbaseValue:
73                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
74                         txn.outputs = []
75                 else:
76                         coinbaseValue -= coinbased
77         
78         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
79         txn.addOutput(coinbaseValue, pkScript)
80         
81         # TODO
82         # TODO: red flag on dupe coinbase
83         return txn
84
85
86 import jsonrpc_getwork
87 from util import Bits2Target
88
89 workLog = {}
90 networkTarget = None
91 DupeShareHACK = {}
92
93 server = None
94 def updateBlocks():
95         if server:
96                 server.wakeLongpoll()
97
98 def blockChanged():
99         global DupeShareHACK
100         DupeShareHACK = {}
101         jsonrpc_getwork._CheckForDupesHACK = {}
102         global MM, networkTarget, server
103         networkTarget = Bits2Target(MM.currentBlock[1])
104         workLog.clear()
105         updateBlocks()
106
107
108 from merklemaker import merkleMaker
109 MM = merkleMaker()
110 MM.__dict__.update(config.__dict__)
111 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
112 MM.clearCoinbaseTxn.assemble()
113 MM.makeCoinbaseTxn = makeCoinbaseTxn
114 MM.onBlockChange = blockChanged
115 MM.onBlockUpdate = updateBlocks
116 MM.start()
117
118
119 from binascii import b2a_hex
120 from copy import deepcopy
121 from struct import pack, unpack
122 import threading
123 from time import time
124 from util import RejectedShare, dblsha, hash2int, swap32
125 import jsonrpc
126 import traceback
127
128 gotwork = None
129 if hasattr(config, 'GotWorkURI'):
130         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
131
132 def submitGotwork(info):
133         try:
134                 gotwork.gotwork(info)
135         except:
136                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
137
138 def getBlockHeader(username):
139         MRD = MM.getMRD()
140         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
141         timestamp = pack('<L', int(time()))
142         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
143         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
144         return (hdr, workLog[username][merkleRoot])
145
146 def getBlockTemplate(username):
147         MC = MM.getMC()
148         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
149         wliLen = coinbase[0]
150         wli = coinbase[1:wliLen+1]
151         workLog.setdefault(username, {})[wli] = (MC, time())
152         return MC
153
154 loggersShare = []
155
156 RBDs = []
157 RBPs = []
158
159 from bitcoin.varlen import varlenEncode, varlenDecode
160 import bitcoin.txn
161 def assembleBlock(blkhdr, txlist):
162         payload = blkhdr
163         payload += varlenEncode(len(txlist))
164         for tx in txlist:
165                 payload += tx.data
166         return payload
167
168 def blockSubmissionThread(payload):
169         while True:
170                 try:
171                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
172                         break
173                 except:
174                         pass
175
176 def checkShare(share):
177         shareTime = share['time'] = time()
178         
179         data = share['data']
180         data = data[:80]
181         (prevBlock, bits) = MM.currentBlock
182         sharePrevBlock = data[4:36]
183         if sharePrevBlock != prevBlock:
184                 if sharePrevBlock == MM.lastBlock[0]:
185                         raise RejectedShare('stale-prevblk')
186                 raise RejectedShare('bad-prevblk')
187         
188         # TODO: use userid
189         username = share['username']
190         if username not in workLog:
191                 raise RejectedShare('unknown-user')
192         
193         if data[72:76] != bits:
194                 raise RejectedShare('bad-diffbits')
195         if data[:4] != b'\1\0\0\0':
196                 raise RejectedShare('bad-version')
197         
198         shareMerkleRoot = data[36:68]
199         if 'blkdata' in share:
200                 pl = share['blkdata']
201                 (txncount, pl) = varlenDecode(pl)
202                 cbtxn = bitcoin.txn.Txn(pl)
203                 cbtxn.disassemble(retExtra=True)
204                 coinbase = cbtxn.getCoinbase()
205                 wliLen = coinbase[0]
206                 wli = coinbase[1:wliLen+1]
207                 mode = 'MC'
208                 moden = 1
209         else:
210                 wli = shareMerkleRoot
211                 mode = 'MRD'
212                 moden = 0
213         
214         MWL = workLog[username]
215         if wli not in MWL:
216                 raise RejectedShare('unknown-work')
217         (wld, issueT) = MWL[wli]
218         share[mode] = wld
219         
220         if data in DupeShareHACK:
221                 raise RejectedShare('duplicate')
222         DupeShareHACK[data] = None
223         
224         blkhash = dblsha(data)
225         if blkhash[28:] != b'\0\0\0\0':
226                 raise RejectedShare('H-not-zero')
227         blkhashn = hash2int(blkhash)
228         
229         global networkTarget
230         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
231         logfunc('BLKHASH: %64x' % (blkhashn,))
232         logfunc(' TARGET: %64x' % (networkTarget,))
233         
234         workMerkleTree = wld[1]
235         workCoinbase = wld[2]
236         
237         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
238         txlist = workMerkleTree.data
239         txlist = [deepcopy(txlist[0]),] + txlist[1:]
240         cbtxn = txlist[0]
241         cbtxn.setCoinbase(workCoinbase)
242         cbtxn.assemble()
243         
244         if blkhashn <= networkTarget:
245                 logfunc("Submitting upstream")
246                 if not moden:
247                         RBDs.append( deepcopy( (data, txlist) ) )
248                         payload = assembleBlock(data, txlist)
249                 else:
250                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
251                         payload = share['data'] + share['blkdata']
252                 logfunc('Real block payload: %s' % (payload,))
253                 RBPs.append(payload)
254                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
255                 bcnode.submitBlock(payload)
256                 share['upstreamResult'] = True
257                 MM.updateBlock(blkhash)
258         
259         # Gotwork hack...
260         if gotwork and blkhashn <= config.GotWorkTarget:
261                 try:
262                         coinbaseMrkl = cbtxn.data
263                         coinbaseMrkl += blkhash
264                         steps = workMerkleTree._steps
265                         coinbaseMrkl += pack('B', len(steps))
266                         for step in steps:
267                                 coinbaseMrkl += step
268                         coinbaseMrkl += b"\0\0\0\0"
269                         info = {}
270                         info['hash'] = b2a_hex(blkhash).decode('ascii')
271                         info['header'] = b2a_hex(data).decode('ascii')
272                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
273                         thr = threading.Thread(target=submitGotwork, args=(info,))
274                         thr.daemon = True
275                         thr.start()
276                 except:
277                         checkShare.logger.warning('Failed to build gotwork request')
278         
279         shareTimestamp = unpack('<L', data[68:72])[0]
280         if shareTime < issueT - 120:
281                 raise RejectedShare('stale-work')
282         if shareTimestamp < shareTime - 300:
283                 raise RejectedShare('time-too-old')
284         if shareTimestamp > shareTime + 7200:
285                 raise RejectedShare('time-too-new')
286         
287         if moden:
288                 cbpre = cbtxn.getCoinbase()
289                 cbpreLen = len(cbpre)
290                 if coinbase[:cbpreLen] != cbpre:
291                         raise RejectedShare('bad-cb-prefix')
292                 
293                 # Filter out known "I support" flags, to prevent exploits
294                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
295                         if coinbase.find(ff) > cbpreLen - len(ff):
296                                 raise RejectedShare('bad-cb-flag')
297                 
298                 if len(coinbase) > 100:
299                         raise RejectedShare('bad-cb-length')
300                 
301                 cbtxn.setCoinbase(coinbase)
302                 cbtxn.assemble()
303                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
304                         raise RejectedShare('bad-txnmrklroot')
305                 
306                 allowed = assembleBlock(data, txlist)
307                 if allowed != share['data'] + share['blkdata']:
308                         raise RejectedShare('bad-txns')
309 checkShare.logger = logging.getLogger('checkShare')
310
311 def receiveShare(share):
312         # TODO: username => userid
313         try:
314                 checkShare(share)
315         except RejectedShare as rej:
316                 share['rejectReason'] = str(rej)
317                 raise
318         finally:
319                 if '_origdata' in share:
320                         share['solution'] = share['_origdata']
321                 else:
322                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
323                 for i in loggersShare:
324                         i(share)
325
326 def newBlockNotification():
327         logging.getLogger('newBlockNotification').info('Received new block notification')
328         MM.updateMerkleTree()
329         # TODO: Force RESPOND TO LONGPOLLS?
330         pass
331
332 def newBlockNotificationSIGNAL(signum, frame):
333         # Use a new thread, in case the signal handler is called with locks held
334         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
335         thr.daemon = True
336         thr.start()
337
338 from signal import signal, SIGUSR1
339 signal(SIGUSR1, newBlockNotificationSIGNAL)
340
341
342 import os
343 import os.path
344 import pickle
345 import signal
346 import sys
347 from time import sleep
348 import traceback
349
350 SAVE_STATE_FILENAME = 'eloipool.worklog'
351
352 def stopServers():
353         logger = logging.getLogger('stopServers')
354         
355         logger.info('Stopping servers...')
356         global bcnode, server
357         servers = (bcnode, server)
358         for s in servers:
359                 s.keepgoing = False
360         for s in servers:
361                 s.wakeup()
362         i = 0
363         while True:
364                 sl = []
365                 for s in servers:
366                         if s.running:
367                                 sl.append(s.__class__.__name__)
368                 if not sl:
369                         break
370                 i += 1
371                 if i >= 0x100:
372                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
373                         break
374                 sleep(0.01)
375         
376         for s in servers:
377                 for fd in s._fd.keys():
378                         os.close(fd)
379
380 def saveState(t = None):
381         logger = logging.getLogger('saveState')
382         
383         # Then, save data needed to resume work
384         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
385         i = 0
386         while True:
387                 try:
388                         with open(SAVE_STATE_FILENAME, 'wb') as f:
389                                 pickle.dump(t, f)
390                                 pickle.dump(DupeShareHACK, f)
391                                 pickle.dump(workLog, f)
392                         break
393                 except:
394                         i += 1
395                         if i >= 0x10000:
396                                 logger.error('Failed to save work\n' + traceback.format_exc())
397                                 try:
398                                         os.unlink(SAVE_STATE_FILENAME)
399                                 except:
400                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
401
402 def exit():
403         t = time()
404         stopServers()
405         saveState(t)
406         logging.getLogger('exit').info('Goodbye...')
407         os.kill(os.getpid(), signal.SIGTERM)
408         sys.exit(0)
409
410 def restart():
411         t = time()
412         stopServers()
413         saveState(t)
414         logging.getLogger('restart').info('Restarting...')
415         try:
416                 os.execv(sys.argv[0], sys.argv)
417         except:
418                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
419
420 def restoreState():
421         if not os.path.exists(SAVE_STATE_FILENAME):
422                 return
423         
424         global workLog, DupeShareHACK
425         
426         logger = logging.getLogger('restoreState')
427         s = os.stat(SAVE_STATE_FILENAME)
428         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
429         try:
430                 with open(SAVE_STATE_FILENAME, 'rb') as f:
431                         t = pickle.load(f)
432                         if type(t) == tuple:
433                                 if len(t) > 2:
434                                         # Future formats, not supported here
435                                         ver = t[3]
436                                         # TODO
437                                 
438                                 # Old format, from 2012-02-02 to 2012-02-03
439                                 workLog = t[0]
440                                 DupeShareHACK = t[1]
441                                 t = None
442                         else:
443                                 if isinstance(t, dict):
444                                         # Old format, from 2012-02-03 to 2012-02-03
445                                         DupeShareHACK = t
446                                         t = None
447                                 else:
448                                         # Current format, from 2012-02-03 onward
449                                         DupeShareHACK = pickle.load(f)
450                                 
451                                 if t + 120 >= time():
452                                         workLog = pickle.load(f)
453                                 else:
454                                         logger.debug('Skipping restore of expired workLog')
455         except:
456                 logger.error('Failed to restore state\n' + traceback.format_exc())
457                 return
458         logger.info('State restored successfully')
459         if t:
460                 logger.info('Total downtime: %g seconds' % (time() - t,))
461
462
463 from jsonrpcserver import JSONRPCListener, JSONRPCServer
464 import interactivemode
465 from networkserver import NetworkListener
466 import threading
467 import sharelogging
468 import imp
469
470 if __name__ == "__main__":
471         if not hasattr(config, 'ShareLogging'):
472                 config.ShareLogging = ()
473         if hasattr(config, 'DbOptions'):
474                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
475                 config.ShareLogging = list(config.ShareLogging)
476                 config.ShareLogging.append( {
477                         'type': 'sql',
478                         'engine': 'postgres',
479                         'dbopts': config.DbOptions,
480                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
481                 } )
482         for i in config.ShareLogging:
483                 if not hasattr(i, 'keys'):
484                         name, parameters = i
485                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
486                         if name == 'postgres':
487                                 name = 'sql'
488                                 i = {
489                                         'engine': 'postgres',
490                                         'dbopts': parameters,
491                                 }
492                         elif name == 'logfile':
493                                 i = {}
494                                 i['thropts'] = parameters
495                                 if 'filename' in parameters:
496                                         i['filename'] = parameters['filename']
497                                         i['thropts'] = dict(i['thropts'])
498                                         del i['thropts']['filename']
499                         else:
500                                 i = parameters
501                         i['type'] = name
502                 
503                 name = i['type']
504                 parameters = i
505                 try:
506                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
507                         m = imp.load_module(name, fp, pathname, description)
508                         lo = getattr(m, name)(**parameters)
509                         loggersShare.append(lo.logShare)
510                 except:
511                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
512
513         LSbc = []
514         if not hasattr(config, 'BitcoinNodeAddresses'):
515                 config.BitcoinNodeAddresses = ()
516         for a in config.BitcoinNodeAddresses:
517                 LSbc.append(NetworkListener(bcnode, a))
518         
519         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
520                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
521         
522         import jsonrpc_getmemorypool
523         import jsonrpc_getwork
524         import jsonrpc_setworkaux
525         
526         server = JSONRPCServer()
527         if hasattr(config, 'JSONRPCAddress'):
528                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
529                 if not hasattr(config, 'JSONRPCAddresses'):
530                         config.JSONRPCAddresses = []
531                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
532         LS = []
533         for a in config.JSONRPCAddresses:
534                 LS.append(JSONRPCListener(server, a))
535         if hasattr(config, 'SecretUser'):
536                 server.SecretUser = config.SecretUser
537         server.aux = MM.CoinbaseAux
538         server.getBlockHeader = getBlockHeader
539         server.getBlockTemplate = getBlockTemplate
540         server.receiveShare = receiveShare
541         server.RaiseRedFlags = RaiseRedFlags
542         
543         server.TrustedForwarders = ()
544         if hasattr(config, 'TrustedForwarders'):
545                 server.TrustedForwarders = config.TrustedForwarders
546         server.ServerName = config.ServerName
547         
548         restoreState()
549         
550         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
551         bcnode_thr.daemon = True
552         bcnode_thr.start()
553         
554         server.serve_forever()