Attempt to handle circumstances that leave us without knowing bits
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         t = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 t.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         t.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         t.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return t
77
78
79 import jsonrpcserver
80 from util import Bits2Target
81
82 workLog = {}
83 networkTarget = None
84 DupeShareHACK = {}
85
86 server = None
87 def updateBlocks():
88         if server:
89                 server.wakeLongpoll()
90
91 def blockChanged():
92         global DupeShareHACK
93         DupeShareHACK = {}
94         jsonrpcserver._CheckForDupesHACK = {}
95         global MM, networkTarget, server
96         bits = MM.currentBlock[2]
97         if bits is None:
98                 networkTarget = None
99         else:
100                 networkTarget = Bits2Target(bits)
101         workLog.clear()
102         updateBlocks()
103
104
105 from merklemaker import merkleMaker
106 MM = merkleMaker()
107 MM.__dict__.update(config.__dict__)
108 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
109 MM.clearCoinbaseTxn.assemble()
110 MM.makeCoinbaseTxn = makeCoinbaseTxn
111 MM.onBlockChange = blockChanged
112 MM.onBlockUpdate = updateBlocks
113 MM.start()
114
115
116 from binascii import b2a_hex
117 from copy import deepcopy
118 from struct import pack, unpack
119 from time import time
120 from util import RejectedShare, dblsha, hash2int
121 import jsonrpc
122 import threading
123 import traceback
124
125 gotwork = None
126 if hasattr(config, 'GotWorkURI'):
127         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
128
129 def submitGotwork(info):
130         try:
131                 gotwork.gotwork(info)
132         except:
133                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
134
135 db = None
136 if hasattr(config, 'DbOptions'):
137         import psycopg2
138         db = psycopg2.connect(**config.DbOptions)
139
140 def getBlockHeader(username):
141         MRD = MM.getMRD()
142         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
143         timestamp = pack('<L', int(time()))
144         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
145         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
146         return hdr
147
148 def YN(b):
149         if b is None:
150                 return None
151         return 'Y' if b else 'N'
152
153 def logShare(share):
154         if db is None:
155                 return
156         dbc = db.cursor()
157         rem_host = share.get('remoteHost', '?')
158         username = share['username']
159         reason = share.get('rejectReason', None)
160         upstreamResult = share.get('upstreamResult', None)
161         solution = share['_origdata']
162         #solution = b2a_hex(solution).decode('utf8')
163         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
164         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
165         dbc.execute(stmt, params)
166         db.commit()
167
168 RBDs = []
169 RBPs = []
170
171 from bitcoin.varlen import varlenEncode
172 def assembleBlock(blkhdr, txlist):
173         payload = blkhdr
174         payload += varlenEncode(len(txlist))
175         for tx in txlist:
176                 payload += tx.data
177         return payload
178
179 def blockSubmissionThread(payload):
180         while True:
181                 try:
182                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
183                         break
184                 except:
185                         pass
186
187 def checkShare(share):
188         data = share['data']
189         data = data[:80]
190         (prevBlock, height, bits) = MM.currentBlock
191         sharePrevBlock = data[4:36]
192         if sharePrevBlock != prevBlock:
193                 if sharePrevBlock == MM.lastBlock[0]:
194                         raise RejectedShare('stale-prevblk')
195                 raise RejectedShare('bad-prevblk')
196         
197         shareMerkleRoot = data[36:68]
198         # TODO: use userid
199         username = share['username']
200         if username not in workLog:
201                 raise RejectedShare('unknown-user')
202         
203         if data[72:76] != bits:
204                 raise RejectedShare('bad-diffbits')
205         if data[:4] != b'\1\0\0\0':
206                 raise RejectedShare('bad-version')
207         
208         MWL = workLog[username]
209         if shareMerkleRoot not in MWL:
210                 raise RejectedShare('unknown-work')
211         (MRD, issueT) = MWL[shareMerkleRoot]
212         share['MRD'] = MRD
213         
214         if data in DupeShareHACK:
215                 raise RejectedShare('duplicate')
216         DupeShareHACK[data] = None
217         
218         shareTime = share['time'] = time()
219         
220         blkhash = dblsha(data)
221         if blkhash[28:] != b'\0\0\0\0':
222                 raise RejectedShare('H-not-zero')
223         blkhashn = hash2int(blkhash)
224         
225         global networkTarget
226         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
227         logfunc('BLKHASH: %64x' % (blkhashn,))
228         logfunc(' TARGET: %64x' % (networkTarget,))
229         
230         txlist = MRD[1].data
231         t = txlist[0]
232         t.setCoinbase(MRD[2])
233         t.assemble()
234         
235         if blkhashn <= networkTarget:
236                 logfunc("Submitting upstream")
237                 RBDs.append( deepcopy( (data, txlist) ) )
238                 payload = assembleBlock(data, txlist)
239                 logfunc('Real block payload: %s' % (payload,))
240                 RBPs.append(payload)
241                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
242                 bcnode.submitBlock(payload)
243                 share['upstreamResult'] = True
244                 MM.updateBlock(blkhash)
245         
246         # Gotwork hack...
247         if gotwork and blkhashn <= config.GotWorkTarget:
248                 try:
249                         coinbaseMrkl = t.data
250                         coinbaseMrkl += blkhash
251                         steps = MRD[1]._steps
252                         coinbaseMrkl += pack('B', len(steps))
253                         for step in steps:
254                                 coinbaseMrkl += step
255                         coinbaseMrkl += b"\0\0\0\0"
256                         info = {}
257                         info['hash'] = b2a_hex(blkhash).decode('ascii')
258                         info['header'] = b2a_hex(data).decode('ascii')
259                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
260                         thr = threading.Thread(target=submitGotwork, args=(info,))
261                         thr.daemon = True
262                         thr.start()
263                 except:
264                         checkShare.logger.warning('Failed to build gotwork request')
265         
266         shareTimestamp = unpack('<L', data[68:72])[0]
267         if shareTime < issueT - 120:
268                 raise RejectedShare('stale-work')
269         if shareTimestamp < shareTime - 300:
270                 raise RejectedShare('time-too-old')
271         if shareTimestamp > shareTime + 7200:
272                 raise RejectedShare('time-too-new')
273         
274         logShare(share)
275 checkShare.logger = logging.getLogger('checkShare')
276
277 def receiveShare(share):
278         # TODO: username => userid
279         try:
280                 checkShare(share)
281         except RejectedShare as rej:
282                 share['rejectReason'] = str(rej)
283                 logShare(share)
284                 raise
285         # TODO
286
287 def newBlockNotification(signum, frame):
288         logging.getLogger('newBlockNotification').info('Received new block notification')
289         MM.updateMerkleTree()
290         # TODO: Force RESPOND TO LONGPOLLS?
291         pass
292
293 from signal import signal, SIGUSR1
294 signal(SIGUSR1, newBlockNotification)
295
296
297 import os
298 import os.path
299 import pickle
300 import signal
301 import sys
302 from time import sleep
303 import traceback
304
305 SAVE_STATE_FILENAME = 'eloipool.worklog'
306
307 def stopServers():
308         logger = logging.getLogger('stopServers')
309         
310         logger.info('Stopping servers...')
311         global bcnode, server
312         servers = (bcnode, server)
313         for s in servers:
314                 s.keepgoing = False
315         for s in servers:
316                 s.wakeup()
317         i = 0
318         while True:
319                 sl = []
320                 for s in servers:
321                         if s.running:
322                                 sl.append(s.__class__.__name__)
323                 if not sl:
324                         break
325                 i += 1
326                 if i >= 0x100:
327                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
328                         break
329                 sleep(0.01)
330         
331         for s in servers:
332                 for fd in s._fd.keys():
333                         os.close(fd)
334
335 def saveState(t = None):
336         logger = logging.getLogger('saveState')
337         
338         # Then, save data needed to resume work
339         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
340         i = 0
341         while True:
342                 try:
343                         with open(SAVE_STATE_FILENAME, 'wb') as f:
344                                 pickle.dump(t, f)
345                                 pickle.dump(DupeShareHACK, f)
346                                 pickle.dump(workLog, f)
347                         break
348                 except:
349                         i += 1
350                         if i >= 0x10000:
351                                 logger.error('Failed to save work\n' + traceback.format_exc())
352                                 try:
353                                         os.unlink(SAVE_STATE_FILENAME)
354                                 except:
355                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
356
357 def exit():
358         t = time()
359         stopServers()
360         saveState(t)
361         logging.getLogger('exit').info('Goodbye...')
362         os.kill(os.getpid(), signal.SIGTERM)
363         sys.exit(0)
364
365 def restart():
366         t = time()
367         stopServers()
368         saveState(t)
369         logging.getLogger('restart').info('Restarting...')
370         try:
371                 os.execv(sys.argv[0], sys.argv)
372         except:
373                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
374
375 def restoreState():
376         if not os.path.exists(SAVE_STATE_FILENAME):
377                 return
378         
379         global workLog, DupeShareHACK
380         
381         logger = logging.getLogger('restoreState')
382         s = os.stat(SAVE_STATE_FILENAME)
383         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
384         try:
385                 with open(SAVE_STATE_FILENAME, 'rb') as f:
386                         t = pickle.load(f)
387                         if type(t) == tuple:
388                                 workLog = t[0]
389                                 DupeShareHACK = t[1]
390                                 t = None
391                         else:
392                                 if isinstance(t, dict):
393                                         DupeShareHACK = t
394                                         t = None
395                                 else:
396                                         DupeShareHACK = pickle.load(f)
397                                 
398                                 if s.st_mtime + 120 >= time():
399                                         workLog = pickle.load(f)
400                                 else:
401                                         logger.debug('Skipping restore of expired workLog')
402         except:
403                 logger.error('Failed to restore state\n' + traceback.format_exc())
404                 return
405         logger.info('State restored successfully')
406         if t:
407                 logger.info('Total downtime: %g seconds' % (time() - t,))
408
409
410 from jsonrpcserver import JSONRPCListener, JSONRPCServer
411 import interactivemode
412 from networkserver import NetworkListener
413 import threading
414
415 if __name__ == "__main__":
416         LSbc = []
417         if not hasattr(config, 'BitcoinNodeAddresses'):
418                 config.BitcoinNodeAddresses = ()
419         for a in config.BitcoinNodeAddresses:
420                 LSbc.append(NetworkListener(bcnode, a))
421         
422         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
423                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
424         
425         server = JSONRPCServer()
426         if hasattr(config, 'JSONRPCAddress'):
427                 if not hasattr(config, 'JSONRPCAddresses'):
428                         config.JSONRPCAddresses = []
429                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
430         LS = []
431         for a in config.JSONRPCAddresses:
432                 LS.append(JSONRPCListener(server, a))
433         if hasattr(config, 'SecretUser'):
434                 server.SecretUser = config.SecretUser
435         server.aux = MM.CoinbaseAux
436         server.getBlockHeader = getBlockHeader
437         server.receiveShare = receiveShare
438         server.RaiseRedFlags = RaiseRedFlags
439         
440         restoreState()
441         
442         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
443         bcnode_thr.daemon = True
444         bcnode_thr.start()
445         
446         server.serve_forever()