Merge branch 'tests'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23
24 import logging
25
26 if len(logging.root.handlers) == 0:
27         logging.basicConfig(
28                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
29                 level=logging.DEBUG,
30         )
31         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
32                 logging.getLogger(infoOnly).setLevel(logging.INFO)
33
34 def RaiseRedFlags(reason):
35         logging.getLogger('redflag').critical(reason)
36         return reason
37
38
39 from bitcoin.node import BitcoinLink, BitcoinNode
40 bcnode = BitcoinNode(config.UpstreamNetworkId)
41 bcnode.userAgent += b'Eloipool:0.1/'
42
43 import jsonrpc
44 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
45
46
47 from bitcoin.script import BitcoinScript
48 from bitcoin.txn import Txn
49 from base58 import b58decode
50 from struct import pack
51 import subprocess
52 from time import time
53
54 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
55         txn = Txn.new()
56         
57         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
58                 coinbased = 0
59                 try:
60                         cmd = config.CoinbaserCmd
61                         cmd = cmd.replace('%d', str(coinbaseValue))
62                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
63                         nout = int(p.stdout.readline())
64                         for i in range(nout):
65                                 amount = int(p.stdout.readline())
66                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
67                                 pkScript = BitcoinScript.toAddress(addr)
68                                 txn.addOutput(amount, pkScript)
69                                 coinbased += amount
70                 except:
71                         coinbased = coinbaseValue + 1
72                 if coinbased >= coinbaseValue:
73                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
74                         txn.outputs = []
75                 else:
76                         coinbaseValue -= coinbased
77         
78         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
79         txn.addOutput(coinbaseValue, pkScript)
80         
81         # TODO
82         # TODO: red flag on dupe coinbase
83         return txn
84
85
86 import jsonrpc_getwork
87 from util import Bits2Target
88
89 workLog = {}
90 networkTarget = None
91 DupeShareHACK = {}
92
93 server = None
94 def updateBlocks():
95         if server:
96                 server.wakeLongpoll()
97
98 def blockChanged():
99         global DupeShareHACK
100         DupeShareHACK = {}
101         jsonrpc_getwork._CheckForDupesHACK = {}
102         global MM, networkTarget, server
103         networkTarget = Bits2Target(MM.currentBlock[1])
104         workLog.clear()
105         updateBlocks()
106
107
108 from merklemaker import merkleMaker
109 MM = merkleMaker()
110 MM.__dict__.update(config.__dict__)
111 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
112 MM.clearCoinbaseTxn.assemble()
113 MM.makeCoinbaseTxn = makeCoinbaseTxn
114 MM.onBlockChange = blockChanged
115 MM.onBlockUpdate = updateBlocks
116 MM.start()
117
118
119 from binascii import b2a_hex
120 from copy import deepcopy
121 from struct import pack, unpack
122 from time import time
123 from util import RejectedShare, dblsha, hash2int, swap32
124 import jsonrpc
125 import threading
126 import traceback
127
128 gotwork = None
129 if hasattr(config, 'GotWorkURI'):
130         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
131
132 def submitGotwork(info):
133         try:
134                 gotwork.gotwork(info)
135         except:
136                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
137
138 def getBlockHeader(username):
139         MRD = MM.getMRD()
140         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
141         timestamp = pack('<L', int(time()))
142         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
143         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
144         return (hdr, workLog[username][merkleRoot])
145
146 def getBlockTemplate(username):
147         MC = MM.getMC()
148         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
149         wliLen = coinbase[0]
150         wli = coinbase[1:wliLen+1]
151         workLog.setdefault(username, {})[wli] = (MC, time())
152         return MC
153
154 loggersShare = []
155
156 RBDs = []
157 RBPs = []
158
159 from bitcoin.varlen import varlenEncode, varlenDecode
160 import bitcoin.txn
161 def assembleBlock(blkhdr, txlist):
162         payload = blkhdr
163         payload += varlenEncode(len(txlist))
164         for tx in txlist:
165                 payload += tx.data
166         return payload
167
168 def blockSubmissionThread(payload):
169         while True:
170                 try:
171                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
172                         break
173                 except:
174                         pass
175
176 def checkShare(share):
177         shareTime = share['time'] = time()
178         
179         data = share['data']
180         data = data[:80]
181         (prevBlock, bits) = MM.currentBlock
182         sharePrevBlock = data[4:36]
183         if sharePrevBlock != prevBlock:
184                 if sharePrevBlock == MM.lastBlock[0]:
185                         raise RejectedShare('stale-prevblk')
186                 raise RejectedShare('bad-prevblk')
187         
188         # TODO: use userid
189         username = share['username']
190         if username not in workLog:
191                 raise RejectedShare('unknown-user')
192         
193         if data[72:76] != bits:
194                 raise RejectedShare('bad-diffbits')
195         if data[:4] != b'\1\0\0\0':
196                 raise RejectedShare('bad-version')
197         
198         shareMerkleRoot = data[36:68]
199         if 'blkdata' in share:
200                 pl = share['blkdata']
201                 (txncount, pl) = varlenDecode(pl)
202                 cbtxn = bitcoin.txn.Txn(pl)
203                 cbtxn.disassemble(retExtra=True)
204                 coinbase = cbtxn.getCoinbase()
205                 wliLen = coinbase[0]
206                 wli = coinbase[1:wliLen+1]
207                 mode = 'MC'
208                 moden = 1
209         else:
210                 wli = shareMerkleRoot
211                 mode = 'MRD'
212                 moden = 0
213         
214         MWL = workLog[username]
215         if wli not in MWL:
216                 raise RejectedShare('unknown-work')
217         (wld, issueT) = MWL[wli]
218         share[mode] = wld
219         
220         if data in DupeShareHACK:
221                 raise RejectedShare('duplicate')
222         DupeShareHACK[data] = None
223         
224         blkhash = dblsha(data)
225         if blkhash[28:] != b'\0\0\0\0':
226                 raise RejectedShare('H-not-zero')
227         blkhashn = hash2int(blkhash)
228         
229         global networkTarget
230         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
231         logfunc('BLKHASH: %64x' % (blkhashn,))
232         logfunc(' TARGET: %64x' % (networkTarget,))
233         
234         workMerkleTree = wld[1]
235         workCoinbase = wld[2]
236         
237         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
238         txlist = workMerkleTree.data
239         txlist = [deepcopy(txlist[0]),] + txlist[1:]
240         cbtxn = txlist[0]
241         cbtxn.setCoinbase(workCoinbase)
242         cbtxn.assemble()
243         
244         if blkhashn <= networkTarget:
245                 logfunc("Submitting upstream")
246                 if not moden:
247                         RBDs.append( deepcopy( (data, txlist) ) )
248                         payload = assembleBlock(data, txlist)
249                 else:
250                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
251                         payload = share['data'] + share['blkdata']
252                 logfunc('Real block payload: %s' % (payload,))
253                 RBPs.append(payload)
254                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
255                 bcnode.submitBlock(payload)
256                 share['upstreamResult'] = True
257                 MM.updateBlock(blkhash)
258         
259         # Gotwork hack...
260         if gotwork and blkhashn <= config.GotWorkTarget:
261                 try:
262                         coinbaseMrkl = cbtxn.data
263                         coinbaseMrkl += blkhash
264                         steps = workMerkleTree._steps
265                         coinbaseMrkl += pack('B', len(steps))
266                         for step in steps:
267                                 coinbaseMrkl += step
268                         coinbaseMrkl += b"\0\0\0\0"
269                         info = {}
270                         info['hash'] = b2a_hex(blkhash).decode('ascii')
271                         info['header'] = b2a_hex(data).decode('ascii')
272                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
273                         thr = threading.Thread(target=submitGotwork, args=(info,))
274                         thr.daemon = True
275                         thr.start()
276                 except:
277                         checkShare.logger.warning('Failed to build gotwork request')
278         
279         shareTimestamp = unpack('<L', data[68:72])[0]
280         if shareTime < issueT - 120:
281                 raise RejectedShare('stale-work')
282         if shareTimestamp < shareTime - 300:
283                 raise RejectedShare('time-too-old')
284         if shareTimestamp > shareTime + 7200:
285                 raise RejectedShare('time-too-new')
286         
287         if moden:
288                 cbpre = cbtxn.getCoinbase()
289                 cbpreLen = len(cbpre)
290                 if coinbase[:cbpreLen] != cbpre:
291                         raise RejectedShare('bad-cb-prefix')
292                 
293                 # Filter out known "I support" flags, to prevent exploits
294                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
295                         if coinbase.find(ff) > cbpreLen - len(ff):
296                                 raise RejectedShare('bad-cb-flag')
297                 
298                 if len(coinbase) > 100:
299                         raise RejectedShare('bad-cb-length')
300                 
301                 cbtxn.setCoinbase(coinbase)
302                 cbtxn.assemble()
303                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
304                         raise RejectedShare('bad-txnmrklroot')
305                 
306                 allowed = assembleBlock(data, txlist)
307                 if allowed != share['data'] + share['blkdata']:
308                         raise RejectedShare('bad-txns')
309 checkShare.logger = logging.getLogger('checkShare')
310
311 def receiveShare(share):
312         # TODO: username => userid
313         try:
314                 checkShare(share)
315         except RejectedShare as rej:
316                 share['rejectReason'] = str(rej)
317                 raise
318         finally:
319                 if '_origdata' in share:
320                         share['solution'] = share['_origdata']
321                 else:
322                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
323                 for i in loggersShare:
324                         i(share)
325
326 def newBlockNotification(signum, frame):
327         logging.getLogger('newBlockNotification').info('Received new block notification')
328         MM.updateMerkleTree()
329         # TODO: Force RESPOND TO LONGPOLLS?
330         pass
331
332 from signal import signal, SIGUSR1
333 signal(SIGUSR1, newBlockNotification)
334
335
336 import os
337 import os.path
338 import pickle
339 import signal
340 import sys
341 from time import sleep
342 import traceback
343
344 SAVE_STATE_FILENAME = 'eloipool.worklog'
345
346 def stopServers():
347         logger = logging.getLogger('stopServers')
348         
349         logger.info('Stopping servers...')
350         global bcnode, server
351         servers = (bcnode, server)
352         for s in servers:
353                 s.keepgoing = False
354         for s in servers:
355                 s.wakeup()
356         i = 0
357         while True:
358                 sl = []
359                 for s in servers:
360                         if s.running:
361                                 sl.append(s.__class__.__name__)
362                 if not sl:
363                         break
364                 i += 1
365                 if i >= 0x100:
366                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
367                         break
368                 sleep(0.01)
369         
370         for s in servers:
371                 for fd in s._fd.keys():
372                         os.close(fd)
373
374 def saveState(t = None):
375         logger = logging.getLogger('saveState')
376         
377         # Then, save data needed to resume work
378         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
379         i = 0
380         while True:
381                 try:
382                         with open(SAVE_STATE_FILENAME, 'wb') as f:
383                                 pickle.dump(t, f)
384                                 pickle.dump(DupeShareHACK, f)
385                                 pickle.dump(workLog, f)
386                         break
387                 except:
388                         i += 1
389                         if i >= 0x10000:
390                                 logger.error('Failed to save work\n' + traceback.format_exc())
391                                 try:
392                                         os.unlink(SAVE_STATE_FILENAME)
393                                 except:
394                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
395
396 def exit():
397         t = time()
398         stopServers()
399         saveState(t)
400         logging.getLogger('exit').info('Goodbye...')
401         os.kill(os.getpid(), signal.SIGTERM)
402         sys.exit(0)
403
404 def restart():
405         t = time()
406         stopServers()
407         saveState(t)
408         logging.getLogger('restart').info('Restarting...')
409         try:
410                 os.execv(sys.argv[0], sys.argv)
411         except:
412                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
413
414 def restoreState():
415         if not os.path.exists(SAVE_STATE_FILENAME):
416                 return
417         
418         global workLog, DupeShareHACK
419         
420         logger = logging.getLogger('restoreState')
421         s = os.stat(SAVE_STATE_FILENAME)
422         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
423         try:
424                 with open(SAVE_STATE_FILENAME, 'rb') as f:
425                         t = pickle.load(f)
426                         if type(t) == tuple:
427                                 if len(t) > 2:
428                                         # Future formats, not supported here
429                                         ver = t[3]
430                                         # TODO
431                                 
432                                 # Old format, from 2012-02-02 to 2012-02-03
433                                 workLog = t[0]
434                                 DupeShareHACK = t[1]
435                                 t = None
436                         else:
437                                 if isinstance(t, dict):
438                                         # Old format, from 2012-02-03 to 2012-02-03
439                                         DupeShareHACK = t
440                                         t = None
441                                 else:
442                                         # Current format, from 2012-02-03 onward
443                                         DupeShareHACK = pickle.load(f)
444                                 
445                                 if t + 120 >= time():
446                                         workLog = pickle.load(f)
447                                 else:
448                                         logger.debug('Skipping restore of expired workLog')
449         except:
450                 logger.error('Failed to restore state\n' + traceback.format_exc())
451                 return
452         logger.info('State restored successfully')
453         if t:
454                 logger.info('Total downtime: %g seconds' % (time() - t,))
455
456
457 from jsonrpcserver import JSONRPCListener, JSONRPCServer
458 import interactivemode
459 from networkserver import NetworkListener
460 import threading
461 import sharelogging
462 import imp
463
464 if __name__ == "__main__":
465         if not hasattr(config, 'ShareLogging'):
466                 config.ShareLogging = ()
467         if hasattr(config, 'DbOptions'):
468                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
469                 config.ShareLogging = list(config.ShareLogging)
470                 config.ShareLogging.append( {
471                         'type': 'sql',
472                         'engine': 'postgres',
473                         'dbopts': config.DbOptions,
474                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
475                 } )
476         for i in config.ShareLogging:
477                 if not hasattr(i, 'keys'):
478                         name, parameters = i
479                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
480                         if name == 'postgres':
481                                 name = 'sql'
482                                 i = {
483                                         'engine': 'postgres',
484                                         'dbopts': parameters,
485                                 }
486                         elif name == 'logfile':
487                                 i = {}
488                                 i['thropts'] = parameters
489                                 if 'filename' in parameters:
490                                         i['filename'] = parameters['filename']
491                                         i['thropts'] = dict(i['thropts'])
492                                         del i['thropts']['filename']
493                         else:
494                                 i = parameters
495                         i['type'] = name
496                 
497                 name = i['type']
498                 parameters = i
499                 try:
500                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
501                         m = imp.load_module(name, fp, pathname, description)
502                         lo = getattr(m, name)(**parameters)
503                         loggersShare.append(lo.logShare)
504                 except:
505                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
506
507         LSbc = []
508         if not hasattr(config, 'BitcoinNodeAddresses'):
509                 config.BitcoinNodeAddresses = ()
510         for a in config.BitcoinNodeAddresses:
511                 LSbc.append(NetworkListener(bcnode, a))
512         
513         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
514                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
515         
516         import jsonrpc_getmemorypool
517         import jsonrpc_getwork
518         import jsonrpc_setworkaux
519         
520         server = JSONRPCServer()
521         if hasattr(config, 'JSONRPCAddress'):
522                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
523                 if not hasattr(config, 'JSONRPCAddresses'):
524                         config.JSONRPCAddresses = []
525                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
526         LS = []
527         for a in config.JSONRPCAddresses:
528                 LS.append(JSONRPCListener(server, a))
529         if hasattr(config, 'SecretUser'):
530                 server.SecretUser = config.SecretUser
531         server.aux = MM.CoinbaseAux
532         server.getBlockHeader = getBlockHeader
533         server.getBlockTemplate = getBlockTemplate
534         server.receiveShare = receiveShare
535         server.RaiseRedFlags = RaiseRedFlags
536         
537         server.TrustedForwarders = ()
538         if hasattr(config, 'TrustedForwarders'):
539                 server.TrustedForwarders = config.TrustedForwarders
540         server.ServerName = config.ServerName
541         
542         restoreState()
543         
544         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
545         bcnode_thr.daemon = True
546         bcnode_thr.start()
547         
548         server.serve_forever()