Merge branch 'serve_getmemorypool'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 import jsonrpcserver
80 from util import Bits2Target
81
82 workLog = {}
83 networkTarget = None
84 DupeShareHACK = {}
85
86 server = None
87 def updateBlocks():
88         if server:
89                 server.wakeLongpoll()
90
91 def blockChanged():
92         global DupeShareHACK
93         DupeShareHACK = {}
94         jsonrpcserver._CheckForDupesHACK = {}
95         global MM, networkTarget, server
96         networkTarget = Bits2Target(MM.currentBlock[1])
97         workLog.clear()
98         updateBlocks()
99
100
101 from merklemaker import merkleMaker
102 MM = merkleMaker()
103 MM.__dict__.update(config.__dict__)
104 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
105 MM.clearCoinbaseTxn.assemble()
106 MM.makeCoinbaseTxn = makeCoinbaseTxn
107 MM.onBlockChange = blockChanged
108 MM.onBlockUpdate = updateBlocks
109 MM.start()
110
111
112 from binascii import b2a_hex
113 from copy import deepcopy
114 from struct import pack, unpack
115 from time import time
116 from util import RejectedShare, dblsha, hash2int, swap32
117 import jsonrpc
118 import threading
119 import traceback
120
121 gotwork = None
122 if hasattr(config, 'GotWorkURI'):
123         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
124
125 def submitGotwork(info):
126         try:
127                 gotwork.gotwork(info)
128         except:
129                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
130
131 db = None
132 if hasattr(config, 'DbOptions'):
133         import psycopg2
134         db = psycopg2.connect(**config.DbOptions)
135
136 def getBlockHeader(username):
137         MRD = MM.getMRD()
138         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
139         timestamp = pack('<L', int(time()))
140         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
141         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
142         return hdr
143
144 def getBlockTemplate(username):
145         MC = MM.getMC()
146         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
147         wliLen = coinbase[0]
148         wli = coinbase[1:wliLen+1]
149         workLog.setdefault(username, {})[wli] = (MC, time())
150         return MC
151
152 def YN(b):
153         if b is None:
154                 return None
155         return 'Y' if b else 'N'
156
157 def logShare(share):
158         if db is None:
159                 return
160         dbc = db.cursor()
161         rem_host = share.get('remoteHost', '?')
162         username = share['username']
163         reason = share.get('rejectReason', None)
164         upstreamResult = share.get('upstreamResult', None)
165         if '_origdata' in share:
166                 solution = share['_origdata']
167         else:
168                 solution = b2a_hex(swap32(share['data'])).decode('utf8')
169         #solution = b2a_hex(solution).decode('utf8')
170         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
171         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
172         dbc.execute(stmt, params)
173         db.commit()
174
175 RBDs = []
176 RBPs = []
177
178 from bitcoin.varlen import varlenEncode, varlenDecode
179 import bitcoin.txn
180 def assembleBlock(blkhdr, txlist):
181         payload = blkhdr
182         payload += varlenEncode(len(txlist))
183         for tx in txlist:
184                 payload += tx.data
185         return payload
186
187 def blockSubmissionThread(payload):
188         while True:
189                 try:
190                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
191                         break
192                 except:
193                         pass
194
195 def checkShare(share):
196         data = share['data']
197         data = data[:80]
198         (prevBlock, bits) = MM.currentBlock
199         sharePrevBlock = data[4:36]
200         if sharePrevBlock != prevBlock:
201                 if sharePrevBlock == MM.lastBlock[0]:
202                         raise RejectedShare('stale-prevblk')
203                 raise RejectedShare('bad-prevblk')
204         
205         # TODO: use userid
206         username = share['username']
207         if username not in workLog:
208                 raise RejectedShare('unknown-user')
209         
210         if data[72:76] != bits:
211                 raise RejectedShare('bad-diffbits')
212         if data[:4] != b'\1\0\0\0':
213                 raise RejectedShare('bad-version')
214         
215         shareMerkleRoot = data[36:68]
216         if 'blkdata' in share:
217                 pl = share['blkdata']
218                 (txncount, pl) = varlenDecode(pl)
219                 cbtxn = bitcoin.txn.Txn(pl)
220                 cbtxn.disassemble(retExtra=True)
221                 coinbase = cbtxn.getCoinbase()
222                 wliLen = coinbase[0]
223                 wli = coinbase[1:wliLen+1]
224                 mode = 'MC'
225                 moden = 1
226         else:
227                 wli = shareMerkleRoot
228                 mode = 'MRD'
229                 moden = 0
230         
231         MWL = workLog[username]
232         if wli not in MWL:
233                 raise RejectedShare('unknown-work')
234         (wld, issueT) = MWL[wli]
235         share[mode] = wld
236         
237         if data in DupeShareHACK:
238                 raise RejectedShare('duplicate')
239         DupeShareHACK[data] = None
240         
241         shareTime = share['time'] = time()
242         
243         blkhash = dblsha(data)
244         if blkhash[28:] != b'\0\0\0\0':
245                 raise RejectedShare('H-not-zero')
246         blkhashn = hash2int(blkhash)
247         
248         global networkTarget
249         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
250         logfunc('BLKHASH: %64x' % (blkhashn,))
251         logfunc(' TARGET: %64x' % (networkTarget,))
252         
253         workMerkleTree = wld[1]
254         workCoinbase = wld[2]
255         
256         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
257         txlist = workMerkleTree.data
258         cbtxn = txlist[0]
259         cbtxn.setCoinbase(workCoinbase)
260         cbtxn.assemble()
261         
262         if blkhashn <= networkTarget:
263                 logfunc("Submitting upstream")
264                 if not moden:
265                         RBDs.append( deepcopy( (data, txlist) ) )
266                         payload = assembleBlock(data, txlist)
267                 else:
268                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
269                         payload = share['data'] + share['blkdata']
270                 logfunc('Real block payload: %s' % (payload,))
271                 RBPs.append(payload)
272                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
273                 bcnode.submitBlock(payload)
274                 share['upstreamResult'] = True
275                 MM.updateBlock(blkhash)
276         
277         # Gotwork hack...
278         if gotwork and blkhashn <= config.GotWorkTarget:
279                 try:
280                         coinbaseMrkl = cbtxn.data
281                         coinbaseMrkl += blkhash
282                         steps = workMerkleTree._steps
283                         coinbaseMrkl += pack('B', len(steps))
284                         for step in steps:
285                                 coinbaseMrkl += step
286                         coinbaseMrkl += b"\0\0\0\0"
287                         info = {}
288                         info['hash'] = b2a_hex(blkhash).decode('ascii')
289                         info['header'] = b2a_hex(data).decode('ascii')
290                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
291                         thr = threading.Thread(target=submitGotwork, args=(info,))
292                         thr.daemon = True
293                         thr.start()
294                 except:
295                         checkShare.logger.warning('Failed to build gotwork request')
296         
297         shareTimestamp = unpack('<L', data[68:72])[0]
298         if shareTime < issueT - 120:
299                 raise RejectedShare('stale-work')
300         if shareTimestamp < shareTime - 300:
301                 raise RejectedShare('time-too-old')
302         if shareTimestamp > shareTime + 7200:
303                 raise RejectedShare('time-too-new')
304         
305         if moden:
306                 cbpre = cbtxn.getCoinbase()
307                 cbpreLen = len(cbpre)
308                 if coinbase[:cbpreLen] != cbpre:
309                         raise RejectedShare('bad-cb-prefix')
310                 
311                 # Filter out known "I support" flags, to prevent exploits
312                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
313                         if coinbase.find(ff) > cbpreLen - len(ff):
314                                 raise RejectedShare('bad-cb-flag')
315                 
316                 if len(coinbase) > 100:
317                         raise RejectedShare('bad-cb-length')
318                 
319                 cbtxn = deepcopy(cbtxn)
320                 cbtxn.setCoinbase(coinbase)
321                 cbtxn.assemble()
322                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
323                         raise RejectedShare('bad-txnmrklroot')
324                 
325                 txlist = [cbtxn,] + txlist[1:]
326                 allowed = assembleBlock(data, txlist)
327                 if allowed != share['data'] + share['blkdata']:
328                         raise RejectedShare('bad-txns')
329         
330         logShare(share)
331 checkShare.logger = logging.getLogger('checkShare')
332
333 def receiveShare(share):
334         # TODO: username => userid
335         try:
336                 checkShare(share)
337         except RejectedShare as rej:
338                 share['rejectReason'] = str(rej)
339                 logShare(share)
340                 raise
341         # TODO
342
343 def newBlockNotification(signum, frame):
344         logging.getLogger('newBlockNotification').info('Received new block notification')
345         MM.updateMerkleTree()
346         # TODO: Force RESPOND TO LONGPOLLS?
347         pass
348
349 from signal import signal, SIGUSR1
350 signal(SIGUSR1, newBlockNotification)
351
352
353 import os
354 import os.path
355 import pickle
356 import signal
357 import sys
358 from time import sleep
359 import traceback
360
361 SAVE_STATE_FILENAME = 'eloipool.worklog'
362
363 def stopServers():
364         logger = logging.getLogger('stopServers')
365         
366         logger.info('Stopping servers...')
367         global bcnode, server
368         servers = (bcnode, server)
369         for s in servers:
370                 s.keepgoing = False
371         for s in servers:
372                 s.wakeup()
373         i = 0
374         while True:
375                 sl = []
376                 for s in servers:
377                         if s.running:
378                                 sl.append(s.__class__.__name__)
379                 if not sl:
380                         break
381                 i += 1
382                 if i >= 0x100:
383                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
384                         break
385                 sleep(0.01)
386         
387         for s in servers:
388                 for fd in s._fd.keys():
389                         os.close(fd)
390
391 def saveState(t = None):
392         logger = logging.getLogger('saveState')
393         
394         # Then, save data needed to resume work
395         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
396         i = 0
397         while True:
398                 try:
399                         with open(SAVE_STATE_FILENAME, 'wb') as f:
400                                 pickle.dump(t, f)
401                                 pickle.dump(DupeShareHACK, f)
402                                 pickle.dump(workLog, f)
403                         break
404                 except:
405                         i += 1
406                         if i >= 0x10000:
407                                 logger.error('Failed to save work\n' + traceback.format_exc())
408                                 try:
409                                         os.unlink(SAVE_STATE_FILENAME)
410                                 except:
411                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
412
413 def exit():
414         t = time()
415         stopServers()
416         saveState(t)
417         logging.getLogger('exit').info('Goodbye...')
418         os.kill(os.getpid(), signal.SIGTERM)
419         sys.exit(0)
420
421 def restart():
422         t = time()
423         stopServers()
424         saveState(t)
425         logging.getLogger('restart').info('Restarting...')
426         try:
427                 os.execv(sys.argv[0], sys.argv)
428         except:
429                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
430
431 def restoreState():
432         if not os.path.exists(SAVE_STATE_FILENAME):
433                 return
434         
435         global workLog, DupeShareHACK
436         
437         logger = logging.getLogger('restoreState')
438         s = os.stat(SAVE_STATE_FILENAME)
439         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
440         try:
441                 with open(SAVE_STATE_FILENAME, 'rb') as f:
442                         t = pickle.load(f)
443                         if type(t) == tuple:
444                                 workLog = t[0]
445                                 DupeShareHACK = t[1]
446                                 t = None
447                         else:
448                                 if isinstance(t, dict):
449                                         DupeShareHACK = t
450                                         t = None
451                                 else:
452                                         DupeShareHACK = pickle.load(f)
453                                 
454                                 if s.st_mtime + 120 >= time():
455                                         workLog = pickle.load(f)
456                                 else:
457                                         logger.debug('Skipping restore of expired workLog')
458         except:
459                 logger.error('Failed to restore state\n' + traceback.format_exc())
460                 return
461         logger.info('State restored successfully')
462         if t:
463                 logger.info('Total downtime: %g seconds' % (time() - t,))
464
465
466 from jsonrpcserver import JSONRPCListener, JSONRPCServer
467 import interactivemode
468 from networkserver import NetworkListener
469 import threading
470
471 if __name__ == "__main__":
472         LSbc = []
473         if not hasattr(config, 'BitcoinNodeAddresses'):
474                 config.BitcoinNodeAddresses = ()
475         for a in config.BitcoinNodeAddresses:
476                 LSbc.append(NetworkListener(bcnode, a))
477         
478         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
479                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
480         
481         server = JSONRPCServer()
482         if hasattr(config, 'JSONRPCAddress'):
483                 if not hasattr(config, 'JSONRPCAddresses'):
484                         config.JSONRPCAddresses = []
485                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
486         LS = []
487         for a in config.JSONRPCAddresses:
488                 LS.append(JSONRPCListener(server, a))
489         if hasattr(config, 'SecretUser'):
490                 server.SecretUser = config.SecretUser
491         server.aux = MM.CoinbaseAux
492         server.getBlockHeader = getBlockHeader
493         server.getBlockTemplate = getBlockTemplate
494         server.receiveShare = receiveShare
495         server.RaiseRedFlags = RaiseRedFlags
496         
497         restoreState()
498         
499         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
500         bcnode_thr.daemon = True
501         bcnode_thr.start()
502         
503         server.serve_forever()