Merge branch 'bugfix_submitblock_rv' into bugfix_submitblock_rv_2
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 import jsonrpc_getwork
80 from util import Bits2Target
81
82 workLog = {}
83 networkTarget = None
84 DupeShareHACK = {}
85
86 server = None
87 def updateBlocks():
88         if server:
89                 server.wakeLongpoll()
90
91 def blockChanged():
92         global DupeShareHACK
93         DupeShareHACK = {}
94         jsonrpc_getwork._CheckForDupesHACK = {}
95         global MM, networkTarget, server
96         networkTarget = Bits2Target(MM.currentBlock[1])
97         workLog.clear()
98         updateBlocks()
99
100
101 from merklemaker import merkleMaker
102 MM = merkleMaker()
103 MM.__dict__.update(config.__dict__)
104 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
105 MM.clearCoinbaseTxn.assemble()
106 MM.makeCoinbaseTxn = makeCoinbaseTxn
107 MM.onBlockChange = blockChanged
108 MM.onBlockUpdate = updateBlocks
109 MM.start()
110
111
112 from binascii import b2a_hex
113 from copy import deepcopy
114 from struct import pack, unpack
115 from time import time
116 from util import RejectedShare, dblsha, hash2int, swap32
117 import jsonrpc
118 import threading
119 import traceback
120
121 gotwork = None
122 if hasattr(config, 'GotWorkURI'):
123         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
124
125 def submitGotwork(info):
126         try:
127                 gotwork.gotwork(info)
128         except:
129                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
130
131 db = None
132 if hasattr(config, 'DbOptions'):
133         import psycopg2
134         db = psycopg2.connect(**config.DbOptions)
135
136 def getBlockHeader(username):
137         MRD = MM.getMRD()
138         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
139         timestamp = pack('<L', int(time()))
140         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
141         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
142         return hdr
143
144 def getBlockTemplate(username):
145         MC = MM.getMC()
146         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
147         wliLen = coinbase[0]
148         wli = coinbase[1:wliLen+1]
149         workLog.setdefault(username, {})[wli] = (MC, time())
150         return MC
151
152 def YN(b):
153         if b is None:
154                 return None
155         return 'Y' if b else 'N'
156
157 def logShare(share):
158         if db is None:
159                 return
160         dbc = db.cursor()
161         rem_host = share.get('remoteHost', '?')
162         username = share['username']
163         reason = share.get('rejectReason', None)
164         upstreamResult = share.get('upstreamResult', None)
165         if '_origdata' in share:
166                 solution = share['_origdata']
167         else:
168                 solution = b2a_hex(swap32(share['data'])).decode('utf8')
169         #solution = b2a_hex(solution).decode('utf8')
170         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
171         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
172         dbc.execute(stmt, params)
173         db.commit()
174
175 RBDs = []
176 RBPs = []
177
178 from bitcoin.varlen import varlenEncode, varlenDecode
179 import bitcoin.txn
180 def assembleBlock(blkhdr, txlist):
181         payload = blkhdr
182         payload += varlenEncode(len(txlist))
183         for tx in txlist:
184                 payload += tx.data
185         return payload
186
187 def blockSubmissionThread(payload):
188         while True:
189                 try:
190                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
191                         break
192                 except:
193                         pass
194
195 def checkShare(share):
196         data = share['data']
197         data = data[:80]
198         (prevBlock, bits) = MM.currentBlock
199         sharePrevBlock = data[4:36]
200         if sharePrevBlock != prevBlock:
201                 if sharePrevBlock == MM.lastBlock[0]:
202                         raise RejectedShare('stale-prevblk')
203                 raise RejectedShare('bad-prevblk')
204         
205         # TODO: use userid
206         username = share['username']
207         if username not in workLog:
208                 raise RejectedShare('unknown-user')
209         
210         if data[72:76] != bits:
211                 raise RejectedShare('bad-diffbits')
212         if data[:4] != b'\1\0\0\0':
213                 raise RejectedShare('bad-version')
214         
215         shareMerkleRoot = data[36:68]
216         if 'blkdata' in share:
217                 pl = share['blkdata']
218                 (txncount, pl) = varlenDecode(pl)
219                 cbtxn = bitcoin.txn.Txn(pl)
220                 cbtxn.disassemble(retExtra=True)
221                 coinbase = cbtxn.getCoinbase()
222                 wliLen = coinbase[0]
223                 wli = coinbase[1:wliLen+1]
224                 mode = 'MC'
225                 moden = 1
226         else:
227                 wli = shareMerkleRoot
228                 mode = 'MRD'
229                 moden = 0
230         
231         MWL = workLog[username]
232         if wli not in MWL:
233                 raise RejectedShare('unknown-work')
234         (wld, issueT) = MWL[wli]
235         share[mode] = wld
236         
237         if data in DupeShareHACK:
238                 raise RejectedShare('duplicate')
239         DupeShareHACK[data] = None
240         
241         shareTime = share['time'] = time()
242         
243         blkhash = dblsha(data)
244         if blkhash[28:] != b'\0\0\0\0':
245                 raise RejectedShare('H-not-zero')
246         blkhashn = hash2int(blkhash)
247         
248         global networkTarget
249         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
250         logfunc('BLKHASH: %64x' % (blkhashn,))
251         logfunc(' TARGET: %64x' % (networkTarget,))
252         
253         workMerkleTree = wld[1]
254         workCoinbase = wld[2]
255         
256         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
257         txlist = workMerkleTree.data
258         cbtxn = txlist[0]
259         cbtxn.setCoinbase(workCoinbase)
260         cbtxn.assemble()
261         
262         if blkhashn <= networkTarget:
263                 logfunc("Submitting upstream")
264                 if not moden:
265                         RBDs.append( deepcopy( (data, txlist) ) )
266                         payload = assembleBlock(data, txlist)
267                 else:
268                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
269                         payload = share['data'] + share['blkdata']
270                 logfunc('Real block payload: %s' % (payload,))
271                 RBPs.append(payload)
272                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
273                 bcnode.submitBlock(payload)
274                 share['upstreamResult'] = True
275                 MM.updateBlock(blkhash)
276         
277         # Gotwork hack...
278         if gotwork and blkhashn <= config.GotWorkTarget:
279                 try:
280                         coinbaseMrkl = cbtxn.data
281                         coinbaseMrkl += blkhash
282                         steps = workMerkleTree._steps
283                         coinbaseMrkl += pack('B', len(steps))
284                         for step in steps:
285                                 coinbaseMrkl += step
286                         coinbaseMrkl += b"\0\0\0\0"
287                         info = {}
288                         info['hash'] = b2a_hex(blkhash).decode('ascii')
289                         info['header'] = b2a_hex(data).decode('ascii')
290                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
291                         thr = threading.Thread(target=submitGotwork, args=(info,))
292                         thr.daemon = True
293                         thr.start()
294                 except:
295                         checkShare.logger.warning('Failed to build gotwork request')
296         
297         shareTimestamp = unpack('<L', data[68:72])[0]
298         if shareTime < issueT - 120:
299                 raise RejectedShare('stale-work')
300         if shareTimestamp < shareTime - 300:
301                 raise RejectedShare('time-too-old')
302         if shareTimestamp > shareTime + 7200:
303                 raise RejectedShare('time-too-new')
304         
305         if moden:
306                 cbpre = cbtxn.getCoinbase()
307                 cbpreLen = len(cbpre)
308                 if coinbase[:cbpreLen] != cbpre:
309                         raise RejectedShare('bad-cb-prefix')
310                 
311                 # Filter out known "I support" flags, to prevent exploits
312                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
313                         if coinbase.find(ff) > cbpreLen - len(ff):
314                                 raise RejectedShare('bad-cb-flag')
315                 
316                 if len(coinbase) > 100:
317                         raise RejectedShare('bad-cb-length')
318                 
319                 cbtxn = deepcopy(cbtxn)
320                 cbtxn.setCoinbase(coinbase)
321                 cbtxn.assemble()
322                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
323                         raise RejectedShare('bad-txnmrklroot')
324                 
325                 txlist = [cbtxn,] + txlist[1:]
326                 allowed = assembleBlock(data, txlist)
327                 if allowed != share['data'] + share['blkdata']:
328                         raise RejectedShare('bad-txns')
329 checkShare.logger = logging.getLogger('checkShare')
330
331 def receiveShare(share):
332         # TODO: username => userid
333         try:
334                 checkShare(share)
335         except RejectedShare as rej:
336                 share['rejectReason'] = str(rej)
337                 raise
338         finally:
339                 logShare(share)
340
341 def newBlockNotification(signum, frame):
342         logging.getLogger('newBlockNotification').info('Received new block notification')
343         MM.updateMerkleTree()
344         # TODO: Force RESPOND TO LONGPOLLS?
345         pass
346
347 from signal import signal, SIGUSR1
348 signal(SIGUSR1, newBlockNotification)
349
350
351 import os
352 import os.path
353 import pickle
354 import signal
355 import sys
356 from time import sleep
357 import traceback
358
359 SAVE_STATE_FILENAME = 'eloipool.worklog'
360
361 def stopServers():
362         logger = logging.getLogger('stopServers')
363         
364         logger.info('Stopping servers...')
365         global bcnode, server
366         servers = (bcnode, server)
367         for s in servers:
368                 s.keepgoing = False
369         for s in servers:
370                 s.wakeup()
371         i = 0
372         while True:
373                 sl = []
374                 for s in servers:
375                         if s.running:
376                                 sl.append(s.__class__.__name__)
377                 if not sl:
378                         break
379                 i += 1
380                 if i >= 0x100:
381                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
382                         break
383                 sleep(0.01)
384         
385         for s in servers:
386                 for fd in s._fd.keys():
387                         os.close(fd)
388
389 def saveState(t = None):
390         logger = logging.getLogger('saveState')
391         
392         # Then, save data needed to resume work
393         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
394         i = 0
395         while True:
396                 try:
397                         with open(SAVE_STATE_FILENAME, 'wb') as f:
398                                 pickle.dump(t, f)
399                                 pickle.dump(DupeShareHACK, f)
400                                 pickle.dump(workLog, f)
401                         break
402                 except:
403                         i += 1
404                         if i >= 0x10000:
405                                 logger.error('Failed to save work\n' + traceback.format_exc())
406                                 try:
407                                         os.unlink(SAVE_STATE_FILENAME)
408                                 except:
409                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
410
411 def exit():
412         t = time()
413         stopServers()
414         saveState(t)
415         logging.getLogger('exit').info('Goodbye...')
416         os.kill(os.getpid(), signal.SIGTERM)
417         sys.exit(0)
418
419 def restart():
420         t = time()
421         stopServers()
422         saveState(t)
423         logging.getLogger('restart').info('Restarting...')
424         try:
425                 os.execv(sys.argv[0], sys.argv)
426         except:
427                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
428
429 def restoreState():
430         if not os.path.exists(SAVE_STATE_FILENAME):
431                 return
432         
433         global workLog, DupeShareHACK
434         
435         logger = logging.getLogger('restoreState')
436         s = os.stat(SAVE_STATE_FILENAME)
437         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
438         try:
439                 with open(SAVE_STATE_FILENAME, 'rb') as f:
440                         t = pickle.load(f)
441                         if type(t) == tuple:
442                                 workLog = t[0]
443                                 DupeShareHACK = t[1]
444                                 t = None
445                         else:
446                                 if isinstance(t, dict):
447                                         DupeShareHACK = t
448                                         t = None
449                                 else:
450                                         DupeShareHACK = pickle.load(f)
451                                 
452                                 if s.st_mtime + 120 >= time():
453                                         workLog = pickle.load(f)
454                                 else:
455                                         logger.debug('Skipping restore of expired workLog')
456         except:
457                 logger.error('Failed to restore state\n' + traceback.format_exc())
458                 return
459         logger.info('State restored successfully')
460         if t:
461                 logger.info('Total downtime: %g seconds' % (time() - t,))
462
463
464 from jsonrpcserver import JSONRPCListener, JSONRPCServer
465 import interactivemode
466 from networkserver import NetworkListener
467 import threading
468
469 if __name__ == "__main__":
470         LSbc = []
471         if not hasattr(config, 'BitcoinNodeAddresses'):
472                 config.BitcoinNodeAddresses = ()
473         for a in config.BitcoinNodeAddresses:
474                 LSbc.append(NetworkListener(bcnode, a))
475         
476         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
477                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
478         
479         import jsonrpc_getmemorypool
480         import jsonrpc_getwork
481         import jsonrpc_setworkaux
482         
483         server = JSONRPCServer()
484         if hasattr(config, 'JSONRPCAddress'):
485                 if not hasattr(config, 'JSONRPCAddresses'):
486                         config.JSONRPCAddresses = []
487                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
488         LS = []
489         for a in config.JSONRPCAddresses:
490                 LS.append(JSONRPCListener(server, a))
491         if hasattr(config, 'SecretUser'):
492                 server.SecretUser = config.SecretUser
493         server.aux = MM.CoinbaseAux
494         server.getBlockHeader = getBlockHeader
495         server.getBlockTemplate = getBlockTemplate
496         server.receiveShare = receiveShare
497         server.RaiseRedFlags = RaiseRedFlags
498         
499         restoreState()
500         
501         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
502         bcnode_thr.daemon = True
503         bcnode_thr.start()
504         
505         server.serve_forever()