Wait until JSONRPCServer is initialized before starting MerkleMaker, so we can skip...
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23
24 import logging
25
26 if len(logging.root.handlers) == 0:
27         logging.basicConfig(
28                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
29                 level=logging.DEBUG,
30         )
31         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
32                 logging.getLogger(infoOnly).setLevel(logging.INFO)
33
34 def RaiseRedFlags(reason):
35         logging.getLogger('redflag').critical(reason)
36         return reason
37
38
39 from bitcoin.node import BitcoinLink, BitcoinNode
40 bcnode = BitcoinNode(config.UpstreamNetworkId)
41 bcnode.userAgent += b'Eloipool:0.1/'
42
43 import jsonrpc
44 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
45
46
47 from bitcoin.script import BitcoinScript
48 from bitcoin.txn import Txn
49 from base58 import b58decode
50 from struct import pack
51 import subprocess
52 from time import time
53
54 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
55         txn = Txn.new()
56         
57         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
58                 coinbased = 0
59                 try:
60                         cmd = config.CoinbaserCmd
61                         cmd = cmd.replace('%d', str(coinbaseValue))
62                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
63                         nout = int(p.stdout.readline())
64                         for i in range(nout):
65                                 amount = int(p.stdout.readline())
66                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
67                                 pkScript = BitcoinScript.toAddress(addr)
68                                 txn.addOutput(amount, pkScript)
69                                 coinbased += amount
70                 except:
71                         coinbased = coinbaseValue + 1
72                 if coinbased >= coinbaseValue:
73                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
74                         txn.outputs = []
75                 else:
76                         coinbaseValue -= coinbased
77         
78         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
79         txn.addOutput(coinbaseValue, pkScript)
80         
81         # TODO
82         # TODO: red flag on dupe coinbase
83         return txn
84
85
86 import jsonrpc_getwork
87 from util import Bits2Target
88
89 workLog = {}
90 networkTarget = None
91 DupeShareHACK = {}
92
93 server = None
94 def updateBlocks():
95         server.wakeLongpoll()
96
97 def blockChanged():
98         global DupeShareHACK
99         DupeShareHACK = {}
100         jsonrpc_getwork._CheckForDupesHACK = {}
101         global MM, networkTarget, server
102         networkTarget = Bits2Target(MM.currentBlock[1])
103         workLog.clear()
104         updateBlocks()
105
106
107 from merklemaker import merkleMaker
108 MM = merkleMaker()
109 MM.__dict__.update(config.__dict__)
110 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
111 MM.clearCoinbaseTxn.assemble()
112 MM.makeCoinbaseTxn = makeCoinbaseTxn
113 MM.onBlockChange = blockChanged
114 MM.onBlockUpdate = updateBlocks
115
116
117 from binascii import b2a_hex
118 from copy import deepcopy
119 from struct import pack, unpack
120 import threading
121 from time import time
122 from util import RejectedShare, dblsha, hash2int, swap32
123 import jsonrpc
124 import traceback
125
126 gotwork = None
127 if hasattr(config, 'GotWorkURI'):
128         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
129
130 def submitGotwork(info):
131         try:
132                 gotwork.gotwork(info)
133         except:
134                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
135
136 def getBlockHeader(username):
137         MRD = MM.getMRD()
138         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
139         timestamp = pack('<L', int(time()))
140         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
141         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
142         return (hdr, workLog[username][merkleRoot])
143
144 def getBlockTemplate(username):
145         MC = MM.getMC()
146         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
147         wliLen = coinbase[0]
148         wli = coinbase[1:wliLen+1]
149         workLog.setdefault(username, {})[wli] = (MC, time())
150         return MC
151
152 loggersShare = []
153
154 RBDs = []
155 RBPs = []
156
157 from bitcoin.varlen import varlenEncode, varlenDecode
158 import bitcoin.txn
159 def assembleBlock(blkhdr, txlist):
160         payload = blkhdr
161         payload += varlenEncode(len(txlist))
162         for tx in txlist:
163                 payload += tx.data
164         return payload
165
166 def blockSubmissionThread(payload):
167         while True:
168                 try:
169                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
170                         break
171                 except:
172                         pass
173         if not rv:
174                 RaiseRedFlags('Upstream rejected block!')
175
176 def checkShare(share):
177         shareTime = share['time'] = time()
178         
179         data = share['data']
180         data = data[:80]
181         (prevBlock, bits) = MM.currentBlock
182         sharePrevBlock = data[4:36]
183         if sharePrevBlock != prevBlock:
184                 if sharePrevBlock == MM.lastBlock[0]:
185                         raise RejectedShare('stale-prevblk')
186                 raise RejectedShare('bad-prevblk')
187         
188         # TODO: use userid
189         username = share['username']
190         if username not in workLog:
191                 raise RejectedShare('unknown-user')
192         
193         if data[72:76] != bits:
194                 raise RejectedShare('bad-diffbits')
195         if data[:4] != b'\1\0\0\0':
196                 raise RejectedShare('bad-version')
197         
198         shareMerkleRoot = data[36:68]
199         if 'blkdata' in share:
200                 pl = share['blkdata']
201                 (txncount, pl) = varlenDecode(pl)
202                 cbtxn = bitcoin.txn.Txn(pl)
203                 cbtxn.disassemble(retExtra=True)
204                 coinbase = cbtxn.getCoinbase()
205                 wliLen = coinbase[0]
206                 wli = coinbase[1:wliLen+1]
207                 mode = 'MC'
208                 moden = 1
209         else:
210                 wli = shareMerkleRoot
211                 mode = 'MRD'
212                 moden = 0
213         
214         MWL = workLog[username]
215         if wli not in MWL:
216                 raise RejectedShare('unknown-work')
217         (wld, issueT) = MWL[wli]
218         share[mode] = wld
219         
220         if data in DupeShareHACK:
221                 raise RejectedShare('duplicate')
222         DupeShareHACK[data] = None
223         
224         blkhash = dblsha(data)
225         if blkhash[28:] != b'\0\0\0\0':
226                 raise RejectedShare('H-not-zero')
227         blkhashn = hash2int(blkhash)
228         
229         global networkTarget
230         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
231         logfunc('BLKHASH: %64x' % (blkhashn,))
232         logfunc(' TARGET: %64x' % (networkTarget,))
233         
234         workMerkleTree = wld[1]
235         workCoinbase = wld[2]
236         
237         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
238         txlist = workMerkleTree.data
239         txlist = [deepcopy(txlist[0]),] + txlist[1:]
240         cbtxn = txlist[0]
241         cbtxn.setCoinbase(workCoinbase)
242         cbtxn.assemble()
243         
244         if blkhashn <= networkTarget:
245                 logfunc("Submitting upstream")
246                 if not moden:
247                         RBDs.append( deepcopy( (data, txlist) ) )
248                         payload = assembleBlock(data, txlist)
249                 else:
250                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
251                         payload = share['data'] + share['blkdata']
252                 logfunc('Real block payload: %s' % (payload,))
253                 RBPs.append(payload)
254                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
255                 bcnode.submitBlock(payload)
256                 share['upstreamResult'] = True
257                 MM.updateBlock(blkhash)
258         
259         # Gotwork hack...
260         if gotwork and blkhashn <= config.GotWorkTarget:
261                 try:
262                         coinbaseMrkl = cbtxn.data
263                         coinbaseMrkl += blkhash
264                         steps = workMerkleTree._steps
265                         coinbaseMrkl += pack('B', len(steps))
266                         for step in steps:
267                                 coinbaseMrkl += step
268                         coinbaseMrkl += b"\0\0\0\0"
269                         info = {}
270                         info['hash'] = b2a_hex(blkhash).decode('ascii')
271                         info['header'] = b2a_hex(data).decode('ascii')
272                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
273                         thr = threading.Thread(target=submitGotwork, args=(info,))
274                         thr.daemon = True
275                         thr.start()
276                 except:
277                         checkShare.logger.warning('Failed to build gotwork request')
278         
279         shareTimestamp = unpack('<L', data[68:72])[0]
280         if shareTime < issueT - 120:
281                 raise RejectedShare('stale-work')
282         if shareTimestamp < shareTime - 300:
283                 raise RejectedShare('time-too-old')
284         if shareTimestamp > shareTime + 7200:
285                 raise RejectedShare('time-too-new')
286         
287         if moden:
288                 cbpre = cbtxn.getCoinbase()
289                 cbpreLen = len(cbpre)
290                 if coinbase[:cbpreLen] != cbpre:
291                         raise RejectedShare('bad-cb-prefix')
292                 
293                 # Filter out known "I support" flags, to prevent exploits
294                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
295                         if coinbase.find(ff) > cbpreLen - len(ff):
296                                 raise RejectedShare('bad-cb-flag')
297                 
298                 if len(coinbase) > 100:
299                         raise RejectedShare('bad-cb-length')
300                 
301                 cbtxn.setCoinbase(coinbase)
302                 cbtxn.assemble()
303                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
304                         raise RejectedShare('bad-txnmrklroot')
305                 
306                 allowed = assembleBlock(data, txlist)
307                 if allowed != share['data'] + share['blkdata']:
308                         raise RejectedShare('bad-txns')
309 checkShare.logger = logging.getLogger('checkShare')
310
311 def receiveShare(share):
312         # TODO: username => userid
313         try:
314                 checkShare(share)
315         except RejectedShare as rej:
316                 share['rejectReason'] = str(rej)
317                 raise
318         finally:
319                 if '_origdata' in share:
320                         share['solution'] = share['_origdata']
321                 else:
322                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
323                 for i in loggersShare:
324                         i(share)
325
326 def newBlockNotification():
327         logging.getLogger('newBlockNotification').info('Received new block notification')
328         MM.updateMerkleTree()
329         # TODO: Force RESPOND TO LONGPOLLS?
330         pass
331
332 def newBlockNotificationSIGNAL(signum, frame):
333         # Use a new thread, in case the signal handler is called with locks held
334         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
335         thr.daemon = True
336         thr.start()
337
338 from signal import signal, SIGUSR1
339 signal(SIGUSR1, newBlockNotificationSIGNAL)
340
341
342 import os
343 import os.path
344 import pickle
345 import signal
346 import sys
347 from time import sleep
348 import traceback
349
350 SAVE_STATE_FILENAME = 'eloipool.worklog'
351
352 def stopServers():
353         logger = logging.getLogger('stopServers')
354         
355         logger.info('Stopping servers...')
356         global bcnode, server
357         servers = (bcnode, server)
358         for s in servers:
359                 s.keepgoing = False
360         for s in servers:
361                 s.wakeup()
362         i = 0
363         while True:
364                 sl = []
365                 for s in servers:
366                         if s.running:
367                                 sl.append(s.__class__.__name__)
368                 if not sl:
369                         break
370                 i += 1
371                 if i >= 0x100:
372                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
373                         break
374                 sleep(0.01)
375         
376         for s in servers:
377                 for fd in s._fd.keys():
378                         os.close(fd)
379
380 def saveState(t = None):
381         logger = logging.getLogger('saveState')
382         
383         # Then, save data needed to resume work
384         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
385         i = 0
386         while True:
387                 try:
388                         with open(SAVE_STATE_FILENAME, 'wb') as f:
389                                 pickle.dump(t, f)
390                                 pickle.dump(DupeShareHACK, f)
391                                 pickle.dump(workLog, f)
392                         break
393                 except:
394                         i += 1
395                         if i >= 0x10000:
396                                 logger.error('Failed to save work\n' + traceback.format_exc())
397                                 try:
398                                         os.unlink(SAVE_STATE_FILENAME)
399                                 except:
400                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
401
402 def exit():
403         t = time()
404         stopServers()
405         saveState(t)
406         logging.getLogger('exit').info('Goodbye...')
407         os.kill(os.getpid(), signal.SIGTERM)
408         sys.exit(0)
409
410 def restart():
411         t = time()
412         stopServers()
413         saveState(t)
414         logging.getLogger('restart').info('Restarting...')
415         try:
416                 os.execv(sys.argv[0], sys.argv)
417         except:
418                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
419
420 def restoreState():
421         if not os.path.exists(SAVE_STATE_FILENAME):
422                 return
423         
424         global workLog, DupeShareHACK
425         
426         logger = logging.getLogger('restoreState')
427         s = os.stat(SAVE_STATE_FILENAME)
428         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
429         try:
430                 with open(SAVE_STATE_FILENAME, 'rb') as f:
431                         t = pickle.load(f)
432                         if type(t) == tuple:
433                                 if len(t) > 2:
434                                         # Future formats, not supported here
435                                         ver = t[3]
436                                         # TODO
437                                 
438                                 # Old format, from 2012-02-02 to 2012-02-03
439                                 workLog = t[0]
440                                 DupeShareHACK = t[1]
441                                 t = None
442                         else:
443                                 if isinstance(t, dict):
444                                         # Old format, from 2012-02-03 to 2012-02-03
445                                         DupeShareHACK = t
446                                         t = None
447                                 else:
448                                         # Current format, from 2012-02-03 onward
449                                         DupeShareHACK = pickle.load(f)
450                                 
451                                 if t + 120 >= time():
452                                         workLog = pickle.load(f)
453                                 else:
454                                         logger.debug('Skipping restore of expired workLog')
455         except:
456                 logger.error('Failed to restore state\n' + traceback.format_exc())
457                 return
458         logger.info('State restored successfully')
459         if t:
460                 logger.info('Total downtime: %g seconds' % (time() - t,))
461
462
463 from jsonrpcserver import JSONRPCListener, JSONRPCServer
464 import interactivemode
465 from networkserver import NetworkListener
466 import threading
467 import sharelogging
468 import imp
469
470 if __name__ == "__main__":
471         if not hasattr(config, 'ShareLogging'):
472                 config.ShareLogging = ()
473         if hasattr(config, 'DbOptions'):
474                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
475                 config.ShareLogging = list(config.ShareLogging)
476                 config.ShareLogging.append( {
477                         'type': 'sql',
478                         'engine': 'postgres',
479                         'dbopts': config.DbOptions,
480                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
481                 } )
482         for i in config.ShareLogging:
483                 if not hasattr(i, 'keys'):
484                         name, parameters = i
485                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
486                         if name == 'postgres':
487                                 name = 'sql'
488                                 i = {
489                                         'engine': 'postgres',
490                                         'dbopts': parameters,
491                                 }
492                         elif name == 'logfile':
493                                 i = {}
494                                 i['thropts'] = parameters
495                                 if 'filename' in parameters:
496                                         i['filename'] = parameters['filename']
497                                         i['thropts'] = dict(i['thropts'])
498                                         del i['thropts']['filename']
499                         else:
500                                 i = parameters
501                         i['type'] = name
502                 
503                 name = i['type']
504                 parameters = i
505                 try:
506                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
507                         m = imp.load_module(name, fp, pathname, description)
508                         lo = getattr(m, name)(**parameters)
509                         loggersShare.append(lo.logShare)
510                 except:
511                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
512
513         LSbc = []
514         if not hasattr(config, 'BitcoinNodeAddresses'):
515                 config.BitcoinNodeAddresses = ()
516         for a in config.BitcoinNodeAddresses:
517                 LSbc.append(NetworkListener(bcnode, a))
518         
519         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
520                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
521         
522         import jsonrpc_getmemorypool
523         import jsonrpc_getwork
524         import jsonrpc_setworkaux
525         
526         server = JSONRPCServer()
527         if hasattr(config, 'JSONRPCAddress'):
528                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
529                 if not hasattr(config, 'JSONRPCAddresses'):
530                         config.JSONRPCAddresses = []
531                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
532         LS = []
533         for a in config.JSONRPCAddresses:
534                 LS.append(JSONRPCListener(server, a))
535         if hasattr(config, 'SecretUser'):
536                 server.SecretUser = config.SecretUser
537         server.aux = MM.CoinbaseAux
538         server.getBlockHeader = getBlockHeader
539         server.getBlockTemplate = getBlockTemplate
540         server.receiveShare = receiveShare
541         server.RaiseRedFlags = RaiseRedFlags
542         
543         server.TrustedForwarders = ()
544         if hasattr(config, 'TrustedForwarders'):
545                 server.TrustedForwarders = config.TrustedForwarders
546         server.ServerName = config.ServerName
547         
548         MM.start()
549         
550         restoreState()
551         
552         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
553         bcnode_thr.daemon = True
554         bcnode_thr.start()
555         
556         server.serve_forever()