Use servername in authentication headers
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23
24 import logging
25
26 if len(logging.root.handlers) == 0:
27         logging.basicConfig(
28                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
29                 level=logging.DEBUG,
30         )
31         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
32                 logging.getLogger(infoOnly).setLevel(logging.INFO)
33
34 def RaiseRedFlags(reason):
35         logging.getLogger('redflag').critical(reason)
36         return reason
37
38
39 from bitcoin.node import BitcoinLink, BitcoinNode
40 bcnode = BitcoinNode(config.UpstreamNetworkId)
41 bcnode.userAgent += b'Eloipool:0.1/'
42
43 import jsonrpc
44 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
45
46
47 from bitcoin.script import BitcoinScript
48 from bitcoin.txn import Txn
49 from base58 import b58decode
50 from struct import pack
51 import subprocess
52 from time import time
53
54 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
55         txn = Txn.new()
56         
57         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
58                 coinbased = 0
59                 try:
60                         cmd = config.CoinbaserCmd
61                         cmd = cmd.replace('%d', str(coinbaseValue))
62                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
63                         nout = int(p.stdout.readline())
64                         for i in range(nout):
65                                 amount = int(p.stdout.readline())
66                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
67                                 pkScript = BitcoinScript.toAddress(addr)
68                                 txn.addOutput(amount, pkScript)
69                                 coinbased += amount
70                 except:
71                         coinbased = coinbaseValue + 1
72                 if coinbased >= coinbaseValue:
73                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
74                         txn.outputs = []
75                 else:
76                         coinbaseValue -= coinbased
77         
78         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
79         txn.addOutput(coinbaseValue, pkScript)
80         
81         # TODO
82         # TODO: red flag on dupe coinbase
83         return txn
84
85
86 import jsonrpc_getwork
87 from util import Bits2Target
88
89 workLog = {}
90 networkTarget = None
91 DupeShareHACK = {}
92
93 server = None
94 def updateBlocks():
95         if server:
96                 server.wakeLongpoll()
97
98 def blockChanged():
99         global DupeShareHACK
100         DupeShareHACK = {}
101         jsonrpc_getwork._CheckForDupesHACK = {}
102         global MM, networkTarget, server
103         networkTarget = Bits2Target(MM.currentBlock[1])
104         workLog.clear()
105         updateBlocks()
106
107
108 from merklemaker import merkleMaker
109 MM = merkleMaker()
110 MM.__dict__.update(config.__dict__)
111 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
112 MM.clearCoinbaseTxn.assemble()
113 MM.makeCoinbaseTxn = makeCoinbaseTxn
114 MM.onBlockChange = blockChanged
115 MM.onBlockUpdate = updateBlocks
116 MM.start()
117
118
119 from binascii import b2a_hex
120 from copy import deepcopy
121 from struct import pack, unpack
122 from time import time
123 from util import RejectedShare, dblsha, hash2int, swap32
124 import jsonrpc
125 import threading
126 import traceback
127
128 gotwork = None
129 if hasattr(config, 'GotWorkURI'):
130         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
131
132 def submitGotwork(info):
133         try:
134                 gotwork.gotwork(info)
135         except:
136                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
137
138 def getBlockHeader(username):
139         MRD = MM.getMRD()
140         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
141         timestamp = pack('<L', int(time()))
142         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
143         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
144         return (hdr, workLog[username][merkleRoot])
145
146 def getBlockTemplate(username):
147         MC = MM.getMC()
148         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
149         wliLen = coinbase[0]
150         wli = coinbase[1:wliLen+1]
151         workLog.setdefault(username, {})[wli] = (MC, time())
152         return MC
153
154 loggersShare = []
155
156 RBDs = []
157 RBPs = []
158
159 from bitcoin.varlen import varlenEncode, varlenDecode
160 import bitcoin.txn
161 def assembleBlock(blkhdr, txlist):
162         payload = blkhdr
163         payload += varlenEncode(len(txlist))
164         for tx in txlist:
165                 payload += tx.data
166         return payload
167
168 def blockSubmissionThread(payload):
169         while True:
170                 try:
171                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
172                         break
173                 except:
174                         pass
175
176 def checkShare(share):
177         shareTime = share['time'] = time()
178         
179         data = share['data']
180         data = data[:80]
181         (prevBlock, bits) = MM.currentBlock
182         sharePrevBlock = data[4:36]
183         if sharePrevBlock != prevBlock:
184                 if sharePrevBlock == MM.lastBlock[0]:
185                         raise RejectedShare('stale-prevblk')
186                 raise RejectedShare('bad-prevblk')
187         
188         # TODO: use userid
189         username = share['username']
190         if username not in workLog:
191                 raise RejectedShare('unknown-user')
192         
193         if data[72:76] != bits:
194                 raise RejectedShare('bad-diffbits')
195         if data[:4] != b'\1\0\0\0':
196                 raise RejectedShare('bad-version')
197         
198         shareMerkleRoot = data[36:68]
199         if 'blkdata' in share:
200                 pl = share['blkdata']
201                 (txncount, pl) = varlenDecode(pl)
202                 cbtxn = bitcoin.txn.Txn(pl)
203                 cbtxn.disassemble(retExtra=True)
204                 coinbase = cbtxn.getCoinbase()
205                 wliLen = coinbase[0]
206                 wli = coinbase[1:wliLen+1]
207                 mode = 'MC'
208                 moden = 1
209         else:
210                 wli = shareMerkleRoot
211                 mode = 'MRD'
212                 moden = 0
213         
214         MWL = workLog[username]
215         if wli not in MWL:
216                 raise RejectedShare('unknown-work')
217         (wld, issueT) = MWL[wli]
218         share[mode] = wld
219         
220         if data in DupeShareHACK:
221                 raise RejectedShare('duplicate')
222         DupeShareHACK[data] = None
223         
224         blkhash = dblsha(data)
225         if blkhash[28:] != b'\0\0\0\0':
226                 raise RejectedShare('H-not-zero')
227         blkhashn = hash2int(blkhash)
228         
229         global networkTarget
230         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
231         logfunc('BLKHASH: %64x' % (blkhashn,))
232         logfunc(' TARGET: %64x' % (networkTarget,))
233         
234         workMerkleTree = wld[1]
235         workCoinbase = wld[2]
236         
237         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
238         txlist = workMerkleTree.data
239         txlist = [deepcopy(txlist[0]),] + txlist[1:]
240         cbtxn = txlist[0]
241         cbtxn.setCoinbase(workCoinbase)
242         cbtxn.assemble()
243         
244         if blkhashn <= networkTarget:
245                 logfunc("Submitting upstream")
246                 if not moden:
247                         RBDs.append( deepcopy( (data, txlist) ) )
248                         payload = assembleBlock(data, txlist)
249                 else:
250                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
251                         payload = share['data'] + share['blkdata']
252                 logfunc('Real block payload: %s' % (payload,))
253                 RBPs.append(payload)
254                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
255                 bcnode.submitBlock(payload)
256                 share['upstreamResult'] = True
257                 MM.updateBlock(blkhash)
258         
259         # Gotwork hack...
260         if gotwork and blkhashn <= config.GotWorkTarget:
261                 try:
262                         coinbaseMrkl = cbtxn.data
263                         coinbaseMrkl += blkhash
264                         steps = workMerkleTree._steps
265                         coinbaseMrkl += pack('B', len(steps))
266                         for step in steps:
267                                 coinbaseMrkl += step
268                         coinbaseMrkl += b"\0\0\0\0"
269                         info = {}
270                         info['hash'] = b2a_hex(blkhash).decode('ascii')
271                         info['header'] = b2a_hex(data).decode('ascii')
272                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
273                         thr = threading.Thread(target=submitGotwork, args=(info,))
274                         thr.daemon = True
275                         thr.start()
276                 except:
277                         checkShare.logger.warning('Failed to build gotwork request')
278         
279         shareTimestamp = unpack('<L', data[68:72])[0]
280         if shareTime < issueT - 120:
281                 raise RejectedShare('stale-work')
282         if shareTimestamp < shareTime - 300:
283                 raise RejectedShare('time-too-old')
284         if shareTimestamp > shareTime + 7200:
285                 raise RejectedShare('time-too-new')
286         
287         if moden:
288                 cbpre = cbtxn.getCoinbase()
289                 cbpreLen = len(cbpre)
290                 if coinbase[:cbpreLen] != cbpre:
291                         raise RejectedShare('bad-cb-prefix')
292                 
293                 # Filter out known "I support" flags, to prevent exploits
294                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
295                         if coinbase.find(ff) > cbpreLen - len(ff):
296                                 raise RejectedShare('bad-cb-flag')
297                 
298                 if len(coinbase) > 100:
299                         raise RejectedShare('bad-cb-length')
300                 
301                 cbtxn.setCoinbase(coinbase)
302                 cbtxn.assemble()
303                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
304                         raise RejectedShare('bad-txnmrklroot')
305                 
306                 allowed = assembleBlock(data, txlist)
307                 if allowed != share['data'] + share['blkdata']:
308                         raise RejectedShare('bad-txns')
309 checkShare.logger = logging.getLogger('checkShare')
310
311 def receiveShare(share):
312         # TODO: username => userid
313         try:
314                 checkShare(share)
315         except RejectedShare as rej:
316                 share['rejectReason'] = str(rej)
317                 raise
318         finally:
319                 if '_origdata' in share:
320                         share['solution'] = share['_origdata']
321                 else:
322                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
323                 for i in loggersShare:
324                         i(share)
325
326 def newBlockNotification(signum, frame):
327         logging.getLogger('newBlockNotification').info('Received new block notification')
328         MM.updateMerkleTree()
329         # TODO: Force RESPOND TO LONGPOLLS?
330         pass
331
332 from signal import signal, SIGUSR1
333 signal(SIGUSR1, newBlockNotification)
334
335
336 import os
337 import os.path
338 import pickle
339 import signal
340 import sys
341 from time import sleep
342 import traceback
343
344 SAVE_STATE_FILENAME = 'eloipool.worklog'
345
346 def stopServers():
347         logger = logging.getLogger('stopServers')
348         
349         logger.info('Stopping servers...')
350         global bcnode, server
351         servers = (bcnode, server)
352         for s in servers:
353                 s.keepgoing = False
354         for s in servers:
355                 s.wakeup()
356         i = 0
357         while True:
358                 sl = []
359                 for s in servers:
360                         if s.running:
361                                 sl.append(s.__class__.__name__)
362                 if not sl:
363                         break
364                 i += 1
365                 if i >= 0x100:
366                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
367                         break
368                 sleep(0.01)
369         
370         for s in servers:
371                 for fd in s._fd.keys():
372                         os.close(fd)
373
374 def saveState(t = None):
375         logger = logging.getLogger('saveState')
376         
377         # Then, save data needed to resume work
378         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
379         i = 0
380         while True:
381                 try:
382                         with open(SAVE_STATE_FILENAME, 'wb') as f:
383                                 pickle.dump(t, f)
384                                 pickle.dump(DupeShareHACK, f)
385                                 pickle.dump(workLog, f)
386                         break
387                 except:
388                         i += 1
389                         if i >= 0x10000:
390                                 logger.error('Failed to save work\n' + traceback.format_exc())
391                                 try:
392                                         os.unlink(SAVE_STATE_FILENAME)
393                                 except:
394                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
395
396 def exit():
397         t = time()
398         stopServers()
399         saveState(t)
400         logging.getLogger('exit').info('Goodbye...')
401         os.kill(os.getpid(), signal.SIGTERM)
402         sys.exit(0)
403
404 def restart():
405         t = time()
406         stopServers()
407         saveState(t)
408         logging.getLogger('restart').info('Restarting...')
409         try:
410                 os.execv(sys.argv[0], sys.argv)
411         except:
412                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
413
414 def restoreState():
415         if not os.path.exists(SAVE_STATE_FILENAME):
416                 return
417         
418         global workLog, DupeShareHACK
419         
420         logger = logging.getLogger('restoreState')
421         s = os.stat(SAVE_STATE_FILENAME)
422         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
423         try:
424                 with open(SAVE_STATE_FILENAME, 'rb') as f:
425                         t = pickle.load(f)
426                         if type(t) == tuple:
427                                 workLog = t[0]
428                                 DupeShareHACK = t[1]
429                                 t = None
430                         else:
431                                 if isinstance(t, dict):
432                                         DupeShareHACK = t
433                                         t = None
434                                 else:
435                                         DupeShareHACK = pickle.load(f)
436                                 
437                                 if s.st_mtime + 120 >= time():
438                                         workLog = pickle.load(f)
439                                 else:
440                                         logger.debug('Skipping restore of expired workLog')
441         except:
442                 logger.error('Failed to restore state\n' + traceback.format_exc())
443                 return
444         logger.info('State restored successfully')
445         if t:
446                 logger.info('Total downtime: %g seconds' % (time() - t,))
447
448
449 from jsonrpcserver import JSONRPCListener, JSONRPCServer
450 import interactivemode
451 from networkserver import NetworkListener
452 import threading
453 import sharelogging
454 import imp
455
456 if __name__ == "__main__":
457         if not hasattr(config, 'ShareLogging'):
458                 config.ShareLogging = ()
459         if hasattr(config, 'DbOptions'):
460                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
461                 config.ShareLogging = list(config.ShareLogging)
462                 config.ShareLogging.append( {
463                         'type': 'sql',
464                         'engine': 'postgres',
465                         'dbopts': config.DbOptions,
466                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
467                 } )
468         for i in config.ShareLogging:
469                 if not hasattr(i, 'keys'):
470                         name, parameters = i
471                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
472                         if name == 'postgres':
473                                 name = 'sql'
474                                 i = {
475                                         'engine': 'postgres',
476                                         'dbopts': parameters,
477                                 }
478                         elif name == 'logfile':
479                                 i = {}
480                                 i['thropts'] = parameters
481                                 if 'filename' in parameters:
482                                         i['filename'] = parameters['filename']
483                                         i['thropts'] = dict(i['thropts'])
484                                         del i['thropts']['filename']
485                         else:
486                                 i = parameters
487                         i['type'] = name
488                 
489                 name = i['type']
490                 parameters = i
491                 try:
492                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
493                         m = imp.load_module(name, fp, pathname, description)
494                         lo = getattr(m, name)(**parameters)
495                         loggersShare.append(lo.logShare)
496                 except:
497                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
498
499         LSbc = []
500         if not hasattr(config, 'BitcoinNodeAddresses'):
501                 config.BitcoinNodeAddresses = ()
502         for a in config.BitcoinNodeAddresses:
503                 LSbc.append(NetworkListener(bcnode, a))
504         
505         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
506                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
507         
508         import jsonrpc_getmemorypool
509         import jsonrpc_getwork
510         import jsonrpc_setworkaux
511         
512         server = JSONRPCServer()
513         if hasattr(config, 'JSONRPCAddress'):
514                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
515                 if not hasattr(config, 'JSONRPCAddresses'):
516                         config.JSONRPCAddresses = []
517                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
518         LS = []
519         for a in config.JSONRPCAddresses:
520                 LS.append(JSONRPCListener(server, a))
521         if hasattr(config, 'SecretUser'):
522                 server.SecretUser = config.SecretUser
523         server.aux = MM.CoinbaseAux
524         server.getBlockHeader = getBlockHeader
525         server.getBlockTemplate = getBlockTemplate
526         server.receiveShare = receiveShare
527         server.RaiseRedFlags = RaiseRedFlags
528         
529         server.TrustedForwarders = ()
530         if hasattr(config, 'TrustedForwarders'):
531                 server.TrustedForwarders = config.TrustedForwarders
532         server.ServerName = config.ServerName
533         
534         restoreState()
535         
536         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
537         bcnode_thr.daemon = True
538         bcnode_thr.start()
539         
540         server.serve_forever()