Merge branch 'ambig_t'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 import jsonrpcserver
80 from util import Bits2Target
81
82 workLog = {}
83 networkTarget = None
84 DupeShareHACK = {}
85
86 server = None
87 def updateBlocks():
88         if server:
89                 server.wakeLongpoll()
90
91 def blockChanged():
92         global DupeShareHACK
93         DupeShareHACK = {}
94         jsonrpcserver._CheckForDupesHACK = {}
95         global MM, networkTarget, server
96         networkTarget = Bits2Target(MM.currentBlock[1])
97         workLog.clear()
98         updateBlocks()
99
100
101 from merklemaker import merkleMaker
102 MM = merkleMaker()
103 MM.__dict__.update(config.__dict__)
104 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
105 MM.clearCoinbaseTxn.assemble()
106 MM.makeCoinbaseTxn = makeCoinbaseTxn
107 MM.onBlockChange = blockChanged
108 MM.onBlockUpdate = updateBlocks
109 MM.start()
110
111
112 from binascii import b2a_hex
113 from copy import deepcopy
114 from struct import pack, unpack
115 from time import time
116 from util import RejectedShare, dblsha, hash2int
117 import jsonrpc
118 import threading
119 import traceback
120
121 gotwork = None
122 if hasattr(config, 'GotWorkURI'):
123         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
124
125 def submitGotwork(info):
126         try:
127                 gotwork.gotwork(info)
128         except:
129                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
130
131 db = None
132 if hasattr(config, 'DbOptions'):
133         import psycopg2
134         db = psycopg2.connect(**config.DbOptions)
135
136 def getBlockHeader(username):
137         MRD = MM.getMRD()
138         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
139         timestamp = pack('<L', int(time()))
140         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
141         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
142         return hdr
143
144 def YN(b):
145         if b is None:
146                 return None
147         return 'Y' if b else 'N'
148
149 def logShare(share):
150         if db is None:
151                 return
152         dbc = db.cursor()
153         rem_host = share.get('remoteHost', '?')
154         username = share['username']
155         reason = share.get('rejectReason', None)
156         upstreamResult = share.get('upstreamResult', None)
157         solution = share['_origdata']
158         #solution = b2a_hex(solution).decode('utf8')
159         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
160         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
161         dbc.execute(stmt, params)
162         db.commit()
163
164 RBDs = []
165 RBPs = []
166
167 from bitcoin.varlen import varlenEncode
168 def assembleBlock(blkhdr, txlist):
169         payload = blkhdr
170         payload += varlenEncode(len(txlist))
171         for tx in txlist:
172                 payload += tx.data
173         return payload
174
175 def blockSubmissionThread(payload):
176         while True:
177                 try:
178                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
179                         break
180                 except:
181                         pass
182
183 def checkShare(share):
184         data = share['data']
185         data = data[:80]
186         (prevBlock, bits) = MM.currentBlock
187         sharePrevBlock = data[4:36]
188         if sharePrevBlock != prevBlock:
189                 if sharePrevBlock == MM.lastBlock[0]:
190                         raise RejectedShare('stale-prevblk')
191                 raise RejectedShare('bad-prevblk')
192         
193         shareMerkleRoot = data[36:68]
194         # TODO: use userid
195         username = share['username']
196         if username not in workLog:
197                 raise RejectedShare('unknown-user')
198         
199         if data[72:76] != bits:
200                 raise RejectedShare('bad-diffbits')
201         if data[:4] != b'\1\0\0\0':
202                 raise RejectedShare('bad-version')
203         
204         MWL = workLog[username]
205         if shareMerkleRoot not in MWL:
206                 raise RejectedShare('unknown-work')
207         (MRD, issueT) = MWL[shareMerkleRoot]
208         share['MRD'] = MRD
209         
210         if data in DupeShareHACK:
211                 raise RejectedShare('duplicate')
212         DupeShareHACK[data] = None
213         
214         shareTime = share['time'] = time()
215         
216         blkhash = dblsha(data)
217         if blkhash[28:] != b'\0\0\0\0':
218                 raise RejectedShare('H-not-zero')
219         blkhashn = hash2int(blkhash)
220         
221         global networkTarget
222         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
223         logfunc('BLKHASH: %64x' % (blkhashn,))
224         logfunc(' TARGET: %64x' % (networkTarget,))
225         
226         txlist = MRD[1].data
227         cbtxn = txlist[0]
228         cbtxn.setCoinbase(MRD[2])
229         cbtxn.assemble()
230         
231         if blkhashn <= networkTarget:
232                 logfunc("Submitting upstream")
233                 RBDs.append( deepcopy( (data, txlist) ) )
234                 payload = assembleBlock(data, txlist)
235                 logfunc('Real block payload: %s' % (payload,))
236                 RBPs.append(payload)
237                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
238                 bcnode.submitBlock(payload)
239                 share['upstreamResult'] = True
240                 MM.updateBlock(blkhash)
241         
242         # Gotwork hack...
243         if gotwork and blkhashn <= config.GotWorkTarget:
244                 try:
245                         coinbaseMrkl = cbtxn.data
246                         coinbaseMrkl += blkhash
247                         steps = MRD[1]._steps
248                         coinbaseMrkl += pack('B', len(steps))
249                         for step in steps:
250                                 coinbaseMrkl += step
251                         coinbaseMrkl += b"\0\0\0\0"
252                         info = {}
253                         info['hash'] = b2a_hex(blkhash).decode('ascii')
254                         info['header'] = b2a_hex(data).decode('ascii')
255                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
256                         thr = threading.Thread(target=submitGotwork, args=(info,))
257                         thr.daemon = True
258                         thr.start()
259                 except:
260                         checkShare.logger.warning('Failed to build gotwork request')
261         
262         shareTimestamp = unpack('<L', data[68:72])[0]
263         if shareTime < issueT - 120:
264                 raise RejectedShare('stale-work')
265         if shareTimestamp < shareTime - 300:
266                 raise RejectedShare('time-too-old')
267         if shareTimestamp > shareTime + 7200:
268                 raise RejectedShare('time-too-new')
269         
270         logShare(share)
271 checkShare.logger = logging.getLogger('checkShare')
272
273 def receiveShare(share):
274         # TODO: username => userid
275         try:
276                 checkShare(share)
277         except RejectedShare as rej:
278                 share['rejectReason'] = str(rej)
279                 logShare(share)
280                 raise
281         # TODO
282
283 def newBlockNotification(signum, frame):
284         logging.getLogger('newBlockNotification').info('Received new block notification')
285         MM.updateMerkleTree()
286         # TODO: Force RESPOND TO LONGPOLLS?
287         pass
288
289 from signal import signal, SIGUSR1
290 signal(SIGUSR1, newBlockNotification)
291
292
293 import os
294 import os.path
295 import pickle
296 import signal
297 import sys
298 from time import sleep
299 import traceback
300
301 SAVE_STATE_FILENAME = 'eloipool.worklog'
302
303 def stopServers():
304         logger = logging.getLogger('stopServers')
305         
306         logger.info('Stopping servers...')
307         global bcnode, server
308         servers = (bcnode, server)
309         for s in servers:
310                 s.keepgoing = False
311         for s in servers:
312                 s.wakeup()
313         i = 0
314         while True:
315                 sl = []
316                 for s in servers:
317                         if s.running:
318                                 sl.append(s.__class__.__name__)
319                 if not sl:
320                         break
321                 i += 1
322                 if i >= 0x100:
323                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
324                         break
325                 sleep(0.01)
326         
327         for s in servers:
328                 for fd in s._fd.keys():
329                         os.close(fd)
330
331 def saveState(t = None):
332         logger = logging.getLogger('saveState')
333         
334         # Then, save data needed to resume work
335         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
336         i = 0
337         while True:
338                 try:
339                         with open(SAVE_STATE_FILENAME, 'wb') as f:
340                                 pickle.dump(t, f)
341                                 pickle.dump(DupeShareHACK, f)
342                                 pickle.dump(workLog, f)
343                         break
344                 except:
345                         i += 1
346                         if i >= 0x10000:
347                                 logger.error('Failed to save work\n' + traceback.format_exc())
348                                 try:
349                                         os.unlink(SAVE_STATE_FILENAME)
350                                 except:
351                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
352
353 def exit():
354         t = time()
355         stopServers()
356         saveState(t)
357         logging.getLogger('exit').info('Goodbye...')
358         os.kill(os.getpid(), signal.SIGTERM)
359         sys.exit(0)
360
361 def restart():
362         t = time()
363         stopServers()
364         saveState(t)
365         logging.getLogger('restart').info('Restarting...')
366         try:
367                 os.execv(sys.argv[0], sys.argv)
368         except:
369                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
370
371 def restoreState():
372         if not os.path.exists(SAVE_STATE_FILENAME):
373                 return
374         
375         global workLog, DupeShareHACK
376         
377         logger = logging.getLogger('restoreState')
378         s = os.stat(SAVE_STATE_FILENAME)
379         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
380         try:
381                 with open(SAVE_STATE_FILENAME, 'rb') as f:
382                         t = pickle.load(f)
383                         if type(t) == tuple:
384                                 workLog = t[0]
385                                 DupeShareHACK = t[1]
386                                 t = None
387                         else:
388                                 if isinstance(t, dict):
389                                         DupeShareHACK = t
390                                         t = None
391                                 else:
392                                         DupeShareHACK = pickle.load(f)
393                                 
394                                 if s.st_mtime + 120 >= time():
395                                         workLog = pickle.load(f)
396                                 else:
397                                         logger.debug('Skipping restore of expired workLog')
398         except:
399                 logger.error('Failed to restore state\n' + traceback.format_exc())
400                 return
401         logger.info('State restored successfully')
402         if t:
403                 logger.info('Total downtime: %g seconds' % (time() - t,))
404
405
406 from jsonrpcserver import JSONRPCListener, JSONRPCServer
407 import interactivemode
408 from networkserver import NetworkListener
409 import threading
410
411 if __name__ == "__main__":
412         LSbc = []
413         if not hasattr(config, 'BitcoinNodeAddresses'):
414                 config.BitcoinNodeAddresses = ()
415         for a in config.BitcoinNodeAddresses:
416                 LSbc.append(NetworkListener(bcnode, a))
417         
418         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
419                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
420         
421         server = JSONRPCServer()
422         if hasattr(config, 'JSONRPCAddress'):
423                 if not hasattr(config, 'JSONRPCAddresses'):
424                         config.JSONRPCAddresses = []
425                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
426         LS = []
427         for a in config.JSONRPCAddresses:
428                 LS.append(JSONRPCListener(server, a))
429         if hasattr(config, 'SecretUser'):
430                 server.SecretUser = config.SecretUser
431         server.aux = MM.CoinbaseAux
432         server.getBlockHeader = getBlockHeader
433         server.receiveShare = receiveShare
434         server.RaiseRedFlags = RaiseRedFlags
435         
436         restoreState()
437         
438         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
439         bcnode_thr.daemon = True
440         bcnode_thr.start()
441         
442         server.serve_forever()