Merge branch 'sharetarget'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         networkTarget = Bits2Target(MM.currentBlock[1])
106         workLog.clear()
107         updateBlocks()
108
109
110 from merklemaker import merkleMaker
111 MM = merkleMaker()
112 MM.__dict__.update(config.__dict__)
113 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
114 MM.clearCoinbaseTxn.assemble()
115 MM.makeCoinbaseTxn = makeCoinbaseTxn
116 MM.onBlockChange = blockChanged
117 MM.onBlockUpdate = updateBlocks
118
119
120 from binascii import b2a_hex
121 from copy import deepcopy
122 from struct import pack, unpack
123 import threading
124 from time import time
125 from util import RejectedShare, dblsha, hash2int, swap32
126 import jsonrpc
127 import traceback
128
129 gotwork = None
130 if hasattr(config, 'GotWorkURI'):
131         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
132
133 def submitGotwork(info):
134         try:
135                 gotwork.gotwork(info)
136         except:
137                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
138
139 def getBlockHeader(username):
140         MRD = MM.getMRD()
141         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
142         timestamp = pack('<L', int(time()))
143         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
144         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
145         return (hdr, workLog[username][merkleRoot])
146
147 def getBlockTemplate(username):
148         MC = MM.getMC()
149         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
150         wliLen = coinbase[0]
151         wli = coinbase[1:wliLen+1]
152         workLog.setdefault(username, {})[wli] = (MC, time())
153         return MC
154
155 loggersShare = []
156
157 RBDs = []
158 RBPs = []
159
160 from bitcoin.varlen import varlenEncode, varlenDecode
161 import bitcoin.txn
162 def assembleBlock(blkhdr, txlist):
163         payload = blkhdr
164         payload += varlenEncode(len(txlist))
165         for tx in txlist:
166                 payload += tx.data
167         return payload
168
169 def blockSubmissionThread(payload):
170         while True:
171                 try:
172                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
173                         break
174                 except:
175                         pass
176         if not rv:
177                 RaiseRedFlags('Upstream rejected block!')
178
179 def checkShare(share):
180         shareTime = share['time'] = time()
181         
182         data = share['data']
183         data = data[:80]
184         (prevBlock, bits) = MM.currentBlock
185         sharePrevBlock = data[4:36]
186         if sharePrevBlock != prevBlock:
187                 if sharePrevBlock == MM.lastBlock[0]:
188                         raise RejectedShare('stale-prevblk')
189                 raise RejectedShare('bad-prevblk')
190         
191         # TODO: use userid
192         username = share['username']
193         if username not in workLog:
194                 raise RejectedShare('unknown-user')
195         
196         if data[72:76] != bits:
197                 raise RejectedShare('bad-diffbits')
198         if data[:4] != b'\1\0\0\0':
199                 raise RejectedShare('bad-version')
200         
201         shareMerkleRoot = data[36:68]
202         if 'blkdata' in share:
203                 pl = share['blkdata']
204                 (txncount, pl) = varlenDecode(pl)
205                 cbtxn = bitcoin.txn.Txn(pl)
206                 cbtxn.disassemble(retExtra=True)
207                 coinbase = cbtxn.getCoinbase()
208                 wliLen = coinbase[0]
209                 wli = coinbase[1:wliLen+1]
210                 mode = 'MC'
211                 moden = 1
212         else:
213                 wli = shareMerkleRoot
214                 mode = 'MRD'
215                 moden = 0
216         
217         MWL = workLog[username]
218         if wli not in MWL:
219                 raise RejectedShare('unknown-work')
220         (wld, issueT) = MWL[wli]
221         share[mode] = wld
222         
223         if data in DupeShareHACK:
224                 raise RejectedShare('duplicate')
225         DupeShareHACK[data] = None
226         
227         blkhash = dblsha(data)
228         if blkhash[28:] != b'\0\0\0\0':
229                 raise RejectedShare('H-not-zero')
230         blkhashn = hash2int(blkhash)
231         
232         global networkTarget
233         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
234         logfunc('BLKHASH: %64x' % (blkhashn,))
235         logfunc(' TARGET: %64x' % (networkTarget,))
236         
237         workMerkleTree = wld[1]
238         workCoinbase = wld[2]
239         
240         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
241         txlist = workMerkleTree.data
242         txlist = [deepcopy(txlist[0]),] + txlist[1:]
243         cbtxn = txlist[0]
244         cbtxn.setCoinbase(workCoinbase)
245         cbtxn.assemble()
246         
247         if blkhashn <= networkTarget:
248                 logfunc("Submitting upstream")
249                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
250                 if not moden:
251                         payload = assembleBlock(data, txlist)
252                 else:
253                         payload = share['data'] + share['blkdata']
254                 logfunc('Real block payload: %s' % (payload,))
255                 RBPs.append(payload)
256                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
257                 bcnode.submitBlock(payload)
258                 share['upstreamResult'] = True
259                 MM.updateBlock(blkhash)
260         
261         # Gotwork hack...
262         if gotwork and blkhashn <= config.GotWorkTarget:
263                 try:
264                         coinbaseMrkl = cbtxn.data
265                         coinbaseMrkl += blkhash
266                         steps = workMerkleTree._steps
267                         coinbaseMrkl += pack('B', len(steps))
268                         for step in steps:
269                                 coinbaseMrkl += step
270                         coinbaseMrkl += b"\0\0\0\0"
271                         info = {}
272                         info['hash'] = b2a_hex(blkhash).decode('ascii')
273                         info['header'] = b2a_hex(data).decode('ascii')
274                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
275                         thr = threading.Thread(target=submitGotwork, args=(info,))
276                         thr.daemon = True
277                         thr.start()
278                 except:
279                         checkShare.logger.warning('Failed to build gotwork request')
280         
281         if blkhashn > config.ShareTarget:
282                 raise RejectedShare('high-hash')
283         
284         shareTimestamp = unpack('<L', data[68:72])[0]
285         if shareTime < issueT - 120:
286                 raise RejectedShare('stale-work')
287         if shareTimestamp < shareTime - 300:
288                 raise RejectedShare('time-too-old')
289         if shareTimestamp > shareTime + 7200:
290                 raise RejectedShare('time-too-new')
291         
292         if moden:
293                 cbpre = cbtxn.getCoinbase()
294                 cbpreLen = len(cbpre)
295                 if coinbase[:cbpreLen] != cbpre:
296                         raise RejectedShare('bad-cb-prefix')
297                 
298                 # Filter out known "I support" flags, to prevent exploits
299                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
300                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
301                                 raise RejectedShare('bad-cb-flag')
302                 
303                 if len(coinbase) > 100:
304                         raise RejectedShare('bad-cb-length')
305                 
306                 cbtxn.setCoinbase(coinbase)
307                 cbtxn.assemble()
308                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
309                         raise RejectedShare('bad-txnmrklroot')
310                 
311                 allowed = assembleBlock(data, txlist)
312                 if allowed != share['data'] + share['blkdata']:
313                         raise RejectedShare('bad-txns')
314 checkShare.logger = logging.getLogger('checkShare')
315
316 def receiveShare(share):
317         # TODO: username => userid
318         try:
319                 checkShare(share)
320         except RejectedShare as rej:
321                 share['rejectReason'] = str(rej)
322                 raise
323         finally:
324                 if '_origdata' in share:
325                         share['solution'] = share['_origdata']
326                 else:
327                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
328                 for i in loggersShare:
329                         i(share)
330
331 def newBlockNotification():
332         logging.getLogger('newBlockNotification').info('Received new block notification')
333         MM.updateMerkleTree()
334         # TODO: Force RESPOND TO LONGPOLLS?
335         pass
336
337 def newBlockNotificationSIGNAL(signum, frame):
338         # Use a new thread, in case the signal handler is called with locks held
339         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
340         thr.daemon = True
341         thr.start()
342
343 from signal import signal, SIGUSR1
344 signal(SIGUSR1, newBlockNotificationSIGNAL)
345
346
347 import os
348 import os.path
349 import pickle
350 import signal
351 import sys
352 from time import sleep
353 import traceback
354
355 SAVE_STATE_FILENAME = 'eloipool.worklog'
356
357 def stopServers():
358         logger = logging.getLogger('stopServers')
359         
360         if hasattr(stopServers, 'already'):
361                 logger.debug('Already tried to stop servers before')
362                 return
363         stopServers.already = True
364         
365         logger.info('Stopping servers...')
366         global bcnode, server
367         servers = (bcnode, server)
368         for s in servers:
369                 s.keepgoing = False
370         for s in servers:
371                 try:
372                         s.wakeup()
373                 except:
374                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
375         i = 0
376         while True:
377                 sl = []
378                 for s in servers:
379                         if s.running:
380                                 sl.append(s.__class__.__name__)
381                 if not sl:
382                         break
383                 i += 1
384                 if i >= 0x100:
385                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
386                         break
387                 sleep(0.01)
388         
389         for s in servers:
390                 for fd in s._fd.keys():
391                         os.close(fd)
392
393 def saveState(t = None):
394         logger = logging.getLogger('saveState')
395         
396         # Then, save data needed to resume work
397         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
398         i = 0
399         while True:
400                 try:
401                         with open(SAVE_STATE_FILENAME, 'wb') as f:
402                                 pickle.dump(t, f)
403                                 pickle.dump(DupeShareHACK, f)
404                                 pickle.dump(workLog, f)
405                         break
406                 except:
407                         i += 1
408                         if i >= 0x10000:
409                                 logger.error('Failed to save work\n' + traceback.format_exc())
410                                 try:
411                                         os.unlink(SAVE_STATE_FILENAME)
412                                 except:
413                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
414
415 def exit():
416         t = time()
417         stopServers()
418         saveState(t)
419         logging.getLogger('exit').info('Goodbye...')
420         os.kill(os.getpid(), signal.SIGTERM)
421         sys.exit(0)
422
423 def restart():
424         t = time()
425         stopServers()
426         saveState(t)
427         logging.getLogger('restart').info('Restarting...')
428         try:
429                 os.execv(sys.argv[0], sys.argv)
430         except:
431                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
432
433 def restoreState():
434         if not os.path.exists(SAVE_STATE_FILENAME):
435                 return
436         
437         global workLog, DupeShareHACK
438         
439         logger = logging.getLogger('restoreState')
440         s = os.stat(SAVE_STATE_FILENAME)
441         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
442         try:
443                 with open(SAVE_STATE_FILENAME, 'rb') as f:
444                         t = pickle.load(f)
445                         if type(t) == tuple:
446                                 if len(t) > 2:
447                                         # Future formats, not supported here
448                                         ver = t[3]
449                                         # TODO
450                                 
451                                 # Old format, from 2012-02-02 to 2012-02-03
452                                 workLog = t[0]
453                                 DupeShareHACK = t[1]
454                                 t = None
455                         else:
456                                 if isinstance(t, dict):
457                                         # Old format, from 2012-02-03 to 2012-02-03
458                                         DupeShareHACK = t
459                                         t = None
460                                 else:
461                                         # Current format, from 2012-02-03 onward
462                                         DupeShareHACK = pickle.load(f)
463                                 
464                                 if t + 120 >= time():
465                                         workLog = pickle.load(f)
466                                 else:
467                                         logger.debug('Skipping restore of expired workLog')
468         except:
469                 logger.error('Failed to restore state\n' + traceback.format_exc())
470                 return
471         logger.info('State restored successfully')
472         if t:
473                 logger.info('Total downtime: %g seconds' % (time() - t,))
474
475
476 from jsonrpcserver import JSONRPCListener, JSONRPCServer
477 import interactivemode
478 from networkserver import NetworkListener
479 import threading
480 import sharelogging
481 import imp
482
483 if __name__ == "__main__":
484         if not hasattr(config, 'ShareLogging'):
485                 config.ShareLogging = ()
486         if hasattr(config, 'DbOptions'):
487                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
488                 config.ShareLogging = list(config.ShareLogging)
489                 config.ShareLogging.append( {
490                         'type': 'sql',
491                         'engine': 'postgres',
492                         'dbopts': config.DbOptions,
493                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
494                 } )
495         for i in config.ShareLogging:
496                 if not hasattr(i, 'keys'):
497                         name, parameters = i
498                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
499                         if name == 'postgres':
500                                 name = 'sql'
501                                 i = {
502                                         'engine': 'postgres',
503                                         'dbopts': parameters,
504                                 }
505                         elif name == 'logfile':
506                                 i = {}
507                                 i['thropts'] = parameters
508                                 if 'filename' in parameters:
509                                         i['filename'] = parameters['filename']
510                                         i['thropts'] = dict(i['thropts'])
511                                         del i['thropts']['filename']
512                         else:
513                                 i = parameters
514                         i['type'] = name
515                 
516                 name = i['type']
517                 parameters = i
518                 try:
519                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
520                         m = imp.load_module(name, fp, pathname, description)
521                         lo = getattr(m, name)(**parameters)
522                         loggersShare.append(lo.logShare)
523                 except:
524                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
525
526         LSbc = []
527         if not hasattr(config, 'BitcoinNodeAddresses'):
528                 config.BitcoinNodeAddresses = ()
529         for a in config.BitcoinNodeAddresses:
530                 LSbc.append(NetworkListener(bcnode, a))
531         
532         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
533                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
534         
535         import jsonrpc_getmemorypool
536         import jsonrpc_getwork
537         import jsonrpc_setworkaux
538         
539         server = JSONRPCServer()
540         if hasattr(config, 'JSONRPCAddress'):
541                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
542                 if not hasattr(config, 'JSONRPCAddresses'):
543                         config.JSONRPCAddresses = []
544                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
545         LS = []
546         for a in config.JSONRPCAddresses:
547                 LS.append(JSONRPCListener(server, a))
548         if hasattr(config, 'SecretUser'):
549                 server.SecretUser = config.SecretUser
550         server.aux = MM.CoinbaseAux
551         server.getBlockHeader = getBlockHeader
552         server.getBlockTemplate = getBlockTemplate
553         server.receiveShare = receiveShare
554         server.RaiseRedFlags = RaiseRedFlags
555         server.ShareTarget = config.ShareTarget
556         
557         if hasattr(config, 'TrustedForwarders'):
558                 server.TrustedForwarders = config.TrustedForwarders
559         server.ServerName = config.ServerName
560         
561         MM.start()
562         
563         restoreState()
564         
565         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
566         bcnode_thr.daemon = True
567         bcnode_thr.start()
568         
569         server.serve_forever()