Merge branch 'bugfix_gmpp_fwd'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23
24 import logging
25
26 if len(logging.root.handlers) == 0:
27         logging.basicConfig(
28                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
29                 level=logging.DEBUG,
30         )
31         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
32                 logging.getLogger(infoOnly).setLevel(logging.INFO)
33
34 def RaiseRedFlags(reason):
35         logging.getLogger('redflag').critical(reason)
36         return reason
37
38
39 from bitcoin.node import BitcoinLink, BitcoinNode
40 bcnode = BitcoinNode(config.UpstreamNetworkId)
41 bcnode.userAgent += b'Eloipool:0.1/'
42
43 import jsonrpc
44 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
45
46
47 from bitcoin.script import BitcoinScript
48 from bitcoin.txn import Txn
49 from base58 import b58decode
50 from struct import pack
51 import subprocess
52 from time import time
53
54 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
55         txn = Txn.new()
56         
57         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
58                 coinbased = 0
59                 try:
60                         cmd = config.CoinbaserCmd
61                         cmd = cmd.replace('%d', str(coinbaseValue))
62                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
63                         nout = int(p.stdout.readline())
64                         for i in range(nout):
65                                 amount = int(p.stdout.readline())
66                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
67                                 pkScript = BitcoinScript.toAddress(addr)
68                                 txn.addOutput(amount, pkScript)
69                                 coinbased += amount
70                 except:
71                         coinbased = coinbaseValue + 1
72                 if coinbased >= coinbaseValue:
73                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
74                         txn.outputs = []
75                 else:
76                         coinbaseValue -= coinbased
77         
78         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
79         txn.addOutput(coinbaseValue, pkScript)
80         
81         # TODO
82         # TODO: red flag on dupe coinbase
83         return txn
84
85
86 import jsonrpc_getwork
87 from util import Bits2Target
88
89 workLog = {}
90 networkTarget = None
91 DupeShareHACK = {}
92
93 server = None
94 def updateBlocks():
95         server.wakeLongpoll()
96
97 def blockChanged():
98         global DupeShareHACK
99         DupeShareHACK = {}
100         jsonrpc_getwork._CheckForDupesHACK = {}
101         global MM, networkTarget, server
102         networkTarget = Bits2Target(MM.currentBlock[1])
103         workLog.clear()
104         updateBlocks()
105
106
107 from merklemaker import merkleMaker
108 MM = merkleMaker()
109 MM.__dict__.update(config.__dict__)
110 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
111 MM.clearCoinbaseTxn.assemble()
112 MM.makeCoinbaseTxn = makeCoinbaseTxn
113 MM.onBlockChange = blockChanged
114 MM.onBlockUpdate = updateBlocks
115
116
117 from binascii import b2a_hex
118 from copy import deepcopy
119 from struct import pack, unpack
120 import threading
121 from time import time
122 from util import RejectedShare, dblsha, hash2int, swap32
123 import jsonrpc
124 import traceback
125
126 gotwork = None
127 if hasattr(config, 'GotWorkURI'):
128         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
129
130 def submitGotwork(info):
131         try:
132                 gotwork.gotwork(info)
133         except:
134                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
135
136 def getBlockHeader(username):
137         MRD = MM.getMRD()
138         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
139         timestamp = pack('<L', int(time()))
140         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
141         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
142         return (hdr, workLog[username][merkleRoot])
143
144 def getBlockTemplate(username):
145         MC = MM.getMC()
146         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
147         wliLen = coinbase[0]
148         wli = coinbase[1:wliLen+1]
149         workLog.setdefault(username, {})[wli] = (MC, time())
150         return MC
151
152 loggersShare = []
153
154 RBDs = []
155 RBPs = []
156
157 from bitcoin.varlen import varlenEncode, varlenDecode
158 import bitcoin.txn
159 def assembleBlock(blkhdr, txlist):
160         payload = blkhdr
161         payload += varlenEncode(len(txlist))
162         for tx in txlist:
163                 payload += tx.data
164         return payload
165
166 def blockSubmissionThread(payload):
167         while True:
168                 try:
169                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
170                         break
171                 except:
172                         pass
173         if not rv:
174                 RaiseRedFlags('Upstream rejected block!')
175
176 def checkShare(share):
177         shareTime = share['time'] = time()
178         
179         data = share['data']
180         data = data[:80]
181         (prevBlock, bits) = MM.currentBlock
182         sharePrevBlock = data[4:36]
183         if sharePrevBlock != prevBlock:
184                 if sharePrevBlock == MM.lastBlock[0]:
185                         raise RejectedShare('stale-prevblk')
186                 raise RejectedShare('bad-prevblk')
187         
188         # TODO: use userid
189         username = share['username']
190         if username not in workLog:
191                 raise RejectedShare('unknown-user')
192         
193         if data[72:76] != bits:
194                 raise RejectedShare('bad-diffbits')
195         if data[:4] != b'\1\0\0\0':
196                 raise RejectedShare('bad-version')
197         
198         shareMerkleRoot = data[36:68]
199         if 'blkdata' in share:
200                 pl = share['blkdata']
201                 (txncount, pl) = varlenDecode(pl)
202                 cbtxn = bitcoin.txn.Txn(pl)
203                 cbtxn.disassemble(retExtra=True)
204                 coinbase = cbtxn.getCoinbase()
205                 wliLen = coinbase[0]
206                 wli = coinbase[1:wliLen+1]
207                 mode = 'MC'
208                 moden = 1
209         else:
210                 wli = shareMerkleRoot
211                 mode = 'MRD'
212                 moden = 0
213         
214         MWL = workLog[username]
215         if wli not in MWL:
216                 raise RejectedShare('unknown-work')
217         (wld, issueT) = MWL[wli]
218         share[mode] = wld
219         
220         if data in DupeShareHACK:
221                 raise RejectedShare('duplicate')
222         DupeShareHACK[data] = None
223         
224         blkhash = dblsha(data)
225         if blkhash[28:] != b'\0\0\0\0':
226                 raise RejectedShare('H-not-zero')
227         blkhashn = hash2int(blkhash)
228         
229         global networkTarget
230         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
231         logfunc('BLKHASH: %64x' % (blkhashn,))
232         logfunc(' TARGET: %64x' % (networkTarget,))
233         
234         workMerkleTree = wld[1]
235         workCoinbase = wld[2]
236         
237         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
238         txlist = workMerkleTree.data
239         txlist = [deepcopy(txlist[0]),] + txlist[1:]
240         cbtxn = txlist[0]
241         cbtxn.setCoinbase(workCoinbase)
242         cbtxn.assemble()
243         
244         if blkhashn <= networkTarget:
245                 logfunc("Submitting upstream")
246                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
247                 if not moden:
248                         payload = assembleBlock(data, txlist)
249                 else:
250                         payload = share['data'] + share['blkdata']
251                 logfunc('Real block payload: %s' % (payload,))
252                 RBPs.append(payload)
253                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
254                 bcnode.submitBlock(payload)
255                 share['upstreamResult'] = True
256                 MM.updateBlock(blkhash)
257         
258         # Gotwork hack...
259         if gotwork and blkhashn <= config.GotWorkTarget:
260                 try:
261                         coinbaseMrkl = cbtxn.data
262                         coinbaseMrkl += blkhash
263                         steps = workMerkleTree._steps
264                         coinbaseMrkl += pack('B', len(steps))
265                         for step in steps:
266                                 coinbaseMrkl += step
267                         coinbaseMrkl += b"\0\0\0\0"
268                         info = {}
269                         info['hash'] = b2a_hex(blkhash).decode('ascii')
270                         info['header'] = b2a_hex(data).decode('ascii')
271                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
272                         thr = threading.Thread(target=submitGotwork, args=(info,))
273                         thr.daemon = True
274                         thr.start()
275                 except:
276                         checkShare.logger.warning('Failed to build gotwork request')
277         
278         shareTimestamp = unpack('<L', data[68:72])[0]
279         if shareTime < issueT - 120:
280                 raise RejectedShare('stale-work')
281         if shareTimestamp < shareTime - 300:
282                 raise RejectedShare('time-too-old')
283         if shareTimestamp > shareTime + 7200:
284                 raise RejectedShare('time-too-new')
285         
286         if moden:
287                 cbpre = cbtxn.getCoinbase()
288                 cbpreLen = len(cbpre)
289                 if coinbase[:cbpreLen] != cbpre:
290                         raise RejectedShare('bad-cb-prefix')
291                 
292                 # Filter out known "I support" flags, to prevent exploits
293                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
294                         if coinbase.find(ff) > cbpreLen - len(ff):
295                                 raise RejectedShare('bad-cb-flag')
296                 
297                 if len(coinbase) > 100:
298                         raise RejectedShare('bad-cb-length')
299                 
300                 cbtxn.setCoinbase(coinbase)
301                 cbtxn.assemble()
302                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
303                         raise RejectedShare('bad-txnmrklroot')
304                 
305                 allowed = assembleBlock(data, txlist)
306                 if allowed != share['data'] + share['blkdata']:
307                         raise RejectedShare('bad-txns')
308 checkShare.logger = logging.getLogger('checkShare')
309
310 def receiveShare(share):
311         # TODO: username => userid
312         try:
313                 checkShare(share)
314         except RejectedShare as rej:
315                 share['rejectReason'] = str(rej)
316                 raise
317         finally:
318                 if '_origdata' in share:
319                         share['solution'] = share['_origdata']
320                 else:
321                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
322                 for i in loggersShare:
323                         i(share)
324
325 def newBlockNotification():
326         logging.getLogger('newBlockNotification').info('Received new block notification')
327         MM.updateMerkleTree()
328         # TODO: Force RESPOND TO LONGPOLLS?
329         pass
330
331 def newBlockNotificationSIGNAL(signum, frame):
332         # Use a new thread, in case the signal handler is called with locks held
333         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
334         thr.daemon = True
335         thr.start()
336
337 from signal import signal, SIGUSR1
338 signal(SIGUSR1, newBlockNotificationSIGNAL)
339
340
341 import os
342 import os.path
343 import pickle
344 import signal
345 import sys
346 from time import sleep
347 import traceback
348
349 SAVE_STATE_FILENAME = 'eloipool.worklog'
350
351 def stopServers():
352         logger = logging.getLogger('stopServers')
353         
354         logger.info('Stopping servers...')
355         global bcnode, server
356         servers = (bcnode, server)
357         for s in servers:
358                 s.keepgoing = False
359         for s in servers:
360                 s.wakeup()
361         i = 0
362         while True:
363                 sl = []
364                 for s in servers:
365                         if s.running:
366                                 sl.append(s.__class__.__name__)
367                 if not sl:
368                         break
369                 i += 1
370                 if i >= 0x100:
371                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
372                         break
373                 sleep(0.01)
374         
375         for s in servers:
376                 for fd in s._fd.keys():
377                         os.close(fd)
378
379 def saveState(t = None):
380         logger = logging.getLogger('saveState')
381         
382         # Then, save data needed to resume work
383         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
384         i = 0
385         while True:
386                 try:
387                         with open(SAVE_STATE_FILENAME, 'wb') as f:
388                                 pickle.dump(t, f)
389                                 pickle.dump(DupeShareHACK, f)
390                                 pickle.dump(workLog, f)
391                         break
392                 except:
393                         i += 1
394                         if i >= 0x10000:
395                                 logger.error('Failed to save work\n' + traceback.format_exc())
396                                 try:
397                                         os.unlink(SAVE_STATE_FILENAME)
398                                 except:
399                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
400
401 def exit():
402         t = time()
403         stopServers()
404         saveState(t)
405         logging.getLogger('exit').info('Goodbye...')
406         os.kill(os.getpid(), signal.SIGTERM)
407         sys.exit(0)
408
409 def restart():
410         t = time()
411         stopServers()
412         saveState(t)
413         logging.getLogger('restart').info('Restarting...')
414         try:
415                 os.execv(sys.argv[0], sys.argv)
416         except:
417                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
418
419 def restoreState():
420         if not os.path.exists(SAVE_STATE_FILENAME):
421                 return
422         
423         global workLog, DupeShareHACK
424         
425         logger = logging.getLogger('restoreState')
426         s = os.stat(SAVE_STATE_FILENAME)
427         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
428         try:
429                 with open(SAVE_STATE_FILENAME, 'rb') as f:
430                         t = pickle.load(f)
431                         if type(t) == tuple:
432                                 if len(t) > 2:
433                                         # Future formats, not supported here
434                                         ver = t[3]
435                                         # TODO
436                                 
437                                 # Old format, from 2012-02-02 to 2012-02-03
438                                 workLog = t[0]
439                                 DupeShareHACK = t[1]
440                                 t = None
441                         else:
442                                 if isinstance(t, dict):
443                                         # Old format, from 2012-02-03 to 2012-02-03
444                                         DupeShareHACK = t
445                                         t = None
446                                 else:
447                                         # Current format, from 2012-02-03 onward
448                                         DupeShareHACK = pickle.load(f)
449                                 
450                                 if t + 120 >= time():
451                                         workLog = pickle.load(f)
452                                 else:
453                                         logger.debug('Skipping restore of expired workLog')
454         except:
455                 logger.error('Failed to restore state\n' + traceback.format_exc())
456                 return
457         logger.info('State restored successfully')
458         if t:
459                 logger.info('Total downtime: %g seconds' % (time() - t,))
460
461
462 from jsonrpcserver import JSONRPCListener, JSONRPCServer
463 import interactivemode
464 from networkserver import NetworkListener
465 import threading
466 import sharelogging
467 import imp
468
469 if __name__ == "__main__":
470         if not hasattr(config, 'ShareLogging'):
471                 config.ShareLogging = ()
472         if hasattr(config, 'DbOptions'):
473                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
474                 config.ShareLogging = list(config.ShareLogging)
475                 config.ShareLogging.append( {
476                         'type': 'sql',
477                         'engine': 'postgres',
478                         'dbopts': config.DbOptions,
479                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
480                 } )
481         for i in config.ShareLogging:
482                 if not hasattr(i, 'keys'):
483                         name, parameters = i
484                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
485                         if name == 'postgres':
486                                 name = 'sql'
487                                 i = {
488                                         'engine': 'postgres',
489                                         'dbopts': parameters,
490                                 }
491                         elif name == 'logfile':
492                                 i = {}
493                                 i['thropts'] = parameters
494                                 if 'filename' in parameters:
495                                         i['filename'] = parameters['filename']
496                                         i['thropts'] = dict(i['thropts'])
497                                         del i['thropts']['filename']
498                         else:
499                                 i = parameters
500                         i['type'] = name
501                 
502                 name = i['type']
503                 parameters = i
504                 try:
505                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
506                         m = imp.load_module(name, fp, pathname, description)
507                         lo = getattr(m, name)(**parameters)
508                         loggersShare.append(lo.logShare)
509                 except:
510                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
511
512         LSbc = []
513         if not hasattr(config, 'BitcoinNodeAddresses'):
514                 config.BitcoinNodeAddresses = ()
515         for a in config.BitcoinNodeAddresses:
516                 LSbc.append(NetworkListener(bcnode, a))
517         
518         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
519                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
520         
521         import jsonrpc_getmemorypool
522         import jsonrpc_getwork
523         import jsonrpc_setworkaux
524         
525         server = JSONRPCServer()
526         if hasattr(config, 'JSONRPCAddress'):
527                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
528                 if not hasattr(config, 'JSONRPCAddresses'):
529                         config.JSONRPCAddresses = []
530                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
531         LS = []
532         for a in config.JSONRPCAddresses:
533                 LS.append(JSONRPCListener(server, a))
534         if hasattr(config, 'SecretUser'):
535                 server.SecretUser = config.SecretUser
536         server.aux = MM.CoinbaseAux
537         server.getBlockHeader = getBlockHeader
538         server.getBlockTemplate = getBlockTemplate
539         server.receiveShare = receiveShare
540         server.RaiseRedFlags = RaiseRedFlags
541         
542         if hasattr(config, 'TrustedForwarders'):
543                 server.TrustedForwarders = config.TrustedForwarders
544         server.ServerName = config.ServerName
545         
546         MM.start()
547         
548         restoreState()
549         
550         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
551         bcnode_thr.daemon = True
552         bcnode_thr.start()
553         
554         server.serve_forever()