-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreplica.py
More file actions
executable file
·307 lines (273 loc) · 11.8 KB
/
replica.py
File metadata and controls
executable file
·307 lines (273 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#
# tinyssb/replica.py -- inject and ingest tinySSB content
# 2023-07-08 <christian.tschudin@unibas.ch>
import hashlib
import os
import traceback
from . import bipf
PKTTYPE_plain48 = 0x00 # ed25519 signature, single packet with 48B
PKTTYPE_chain20 = 0x01 # ed25519 signature, start of hash sidechain
'''
PKTTYPE_ischild = 0x02 # metafeed information, only in genesis block
PKTTYPE_iscontn = 0x03 # metafeed information, only in genesis block
PKTTYPE_mkchild = 0x04 # metafeed information
PKTTYPE_contdas = 0x05 # metafeed information
PKTTYPE_acknldg = 0x06 # proof of having seen some fid:seq:sig entry
# see end of this document for the payload content of types 0x02-0x05
'''
PFX = b'tinyssb-v0'
class Replica:
def __init__(self, datapath, fid, verify_fct, is_author=False):
self.path = datapath + '/' + fid.hex() + '/'
self.log_fname = self.path + 'log.bin'
self.fnt_fname = self.path + 'frontier.bin'
self.tmp_fname = self.path + 'frontier.tmp'
self.fid = fid
self.verify_fct = verify_fct
self.is_author = is_author
if not os.path.isdir(self.path):
os.mkdir(self.path)
if not os.path.isfile(self.log_fname):
open(self.log_fname, 'wb').close()
self.state = {'pend_sc': {}} # pend. sidechains {seq:[cnr,remain,hptr,pos_to_write]}
self._persist_frontier(0, 0, fid[:20])
# keep frontier in memory
with open(self.fnt_fname, 'rb') as f:
buf = f.read()
self.state = bipf.loads(buf)
# print(f" replica {fid.hex()} state:")
# print(f" {self.state}")
while os.path.getsize(self.log_fname) > self.state['max_pos']:
with open(self.log_fname, 'r+b') as f:
pos = self.state['max_pos']
f.seek(pos, os.SEEK_SET)
pkt = f.read(120)
seq = self.state['max_seq'] + 1
nam = PFX + self.fid + seq.to_bytes(4,'big') + self.state['prev']
dmx = hashlib.sha256(nam).digest()[:7]
if self.is_author or dmx != pkt[:7]:
print('truncating log file')
f.seek(pos, os.SEEK_SET)
f.truncate()
break
chunk_cnt = 0
if pkt[7] == PKTTYPE_chain20:
content_len, sz = bipf.varint_decode(pkt, 8)
content_len -= 48 - 20 - sz
ptr = pkt[36:56]
chunk_cnt = (content_len + 99) // 100
if chunk_cnt > 0:
self.state['pend_sc'][seq] = [0, chunk_cnt, ptr, pos + 120]
while chunk_cnt > 0: # allocate sidechain space in the file
f.write(bytes(120))
chunk_cnt -= 1
f.write(pos.to_bytes(4,'big'))
pos = f.tell()
self._persist_frontier(seq, pos,
hashlib.sha256(nam + pkt).digest()[:20])
def _persist_frontier(self, seq, pos, prev):
self.state['max_seq'] = seq
self.state['max_pos'] = pos
self.state['prev'] = prev
with open(self.tmp_fname, 'wb') as f:
f.write(bipf.dumps(self.state))
os.replace(self.tmp_fname, self.fnt_fname)
# ----------------------------------------------------------------------
# public methods:
def ingest_entry_pkt(self, pkt, seq): # True/False
assert len(pkt) == 120
if seq != self.state['max_seq'] + 1:
print(" R: wrong seq nr", seq, self.state['max_seq'] + 1)
return False
nam = PFX + self.fid + seq.to_bytes(4,'big') + self.state['prev']
dmx = hashlib.sha256(nam).digest()[:7]
if dmx != pkt[:7]:
print(" R: wrong dmx", pkt[:7].hex(), dmx.hex())
return False
if not self.verify_fct(self.fid, pkt[56:], nam + pkt[:56]):
print(" R: signature verify failed")
return False
chunk_cnt = 0
if pkt[7] == PKTTYPE_chain20:
content_len, sz = bipf.varint_decode(pkt, 8)
content_len -= 48 - 20 - sz
ptr = pkt[36:56]
chunk_cnt = (content_len + 99) // 100
log_entry = pkt + bytes(chunk_cnt * 120)
log_entry += self.state['max_pos'].to_bytes(4, 'big')
with open(self.log_fname, 'ab') as f:
f.write(log_entry)
if chunk_cnt > 0:
self.state['pend_sc'][seq] = [0, chunk_cnt, ptr,
self.state['max_pos'] + 120]
pos = self.state['max_pos'] + len(log_entry)
# print(f" R: fid={self.fid[:10].hex()} max_seq={seq}, max_pos={pos}")
self._persist_frontier(seq, pos,
hashlib.sha256(nam + pkt).digest()[:20])
return True
def ingest_chunk_pkt(self, pkt, seq): # True/False
assert len(pkt) == 120
try:
pend = self.state['pend_sc'][seq] # [cnr, rem, hptr, pos]
assert pend[2] == hashlib.sha256(pkt).digest()[:20]
with open(self.log_fname, 'r+b') as f:
f.seek(pend[3], os.SEEK_SET)
f.write(pkt)
pos = f.tell()
except:
return False
if pend[1] <= 1: # chain is complete
del self.state['pend_sc'][seq]
else:
pend[0] += 1
pend[1] -= 1
pend[2] = pkt[-20:]
pend[3] = pos
# print(f" ? new pending state {pend}")
with open(self.tmp_fname, 'wb') as f:
f.write(bipf.dumps(self.state))
os.replace(self.tmp_fname, self.fnt_fname)
return True
def get_next_seq(self): # (next_seq, dmx)
seq = self.state['max_seq']+1
nam = PFX + self.fid + seq.to_bytes(4,'big') + self.state['prev']
return (seq, hashlib.sha256(nam).digest()[:7])
def get_open_chains(self, cursor=0): # {seq:[cnr,rem,hptr,pos]}
return self.state['pend_sc']
def get_entry_pkt(self, seq):
try:
assert seq >= 1 and seq <= self.state['max_seq']
with open(self.log_fname, 'rb') as f:
pos = os.path.getsize(self.log_fname)
cnt = self.state['max_seq'] - seq + 1
while cnt > 0:
f.seek(pos-4, os.SEEK_SET)
pos = int.from_bytes(f.read(4), byteorder='big')
cnt -= 1
f.seek(pos, os.SEEK_SET)
return f.read(120)
except:
return None
def get_content_len(self, seq):
pkt = self.get_entry_pkt(seq)
if pkt == None:
return None
if pkt[7] == PKTTYPE_plain48:
return (48,48)
if pkt[7] == PKTTYPE_chain20:
content_len, sz = bipf.varint_decode(pkt, 8)
if not seq in self.state['pend_sc']:
return (content_len, content_len)
available = (48-20-sz) + 100 * self.state['pend_sc'][seq][0]
return (available, content_len)
return None
def get_chunk_pkt(self, seq, cnr):
try:
assert seq >= 1 and seq <= self.state['max_seq']
if seq in self.state['pend_sc']:
if cnr >= self.state['pend_sc'][seq][0]:
return None
with open(self.log_fname, 'rb') as f:
pos = os.path.getsize(self.log_fname)
cnt = self.state['max_seq'] - seq + 1
while cnt > 0:
f.seek(pos-4, os.SEEK_SET)
lim = pos
pos = int.from_bytes(f.read(4), byteorder='big')
cnt -= 1
pos += 120*(cnr+1)
if pos > lim-120:
return None
f.seek(pos, os.SEEK_SET)
return f.read(120)
except:
return None
def read(self, seq): #, offs=0, lim=0):
if self.state['max_seq'] < seq or seq < 1:
return None
with open(self.log_fname, 'rb') as f:
pos = os.path.getsize(self.log_fname)
cnt = self.state['max_seq'] - seq + 1
while cnt > 0:
f.seek(pos-4, os.SEEK_SET)
pos = int.from_bytes(f.read(4), byteorder='big')
cnt -= 1
f.seek(pos, os.SEEK_SET)
pkt = f.read(120)
if pkt[7] == PKTTYPE_plain48:
return pkt[8:56]
if pkt[7] != PKTTYPE_chain20:
return None
chain_len, sz = bipf.varint_decode(pkt[8:])
content = pkt[8+sz:36]
blocks = (chain_len - len(content) + 99) // 100
while blocks > 0:
pkt = f.read(120)
content += pkt[:100]
blocks -= 1
return content[:chain_len]
# ----------------------------------------------------------------------
# the following is not needed for mere forwarding repos (pubs)
def write48(self, content, sign_fct): # publish event, returns seq or None
assert os.path.getsize(self.log_fname) == self.state['max_pos']
if len(content) < 48:
content + bytes(48 - len(content))
else:
content = content[:48]
seq = self.state['max_seq'] + 1
nam = PFX + self.fid + seq.to_bytes(4,'big') + \
self.state['prev']
dmx = hashlib.sha256(nam).digest()[:7]
msg = dmx + bytes([PKTTYPE_plain48]) + content
wire = msg + sign_fct(nam + msg)
assert len(wire) == 120
assert self.verify_fct(self.fid, wire[56:], nam + wire[:56])
with open(self.log_fname, 'ab') as f:
f.write(wire + self.state['max_pos'].to_bytes(4, 'big'))
self._persist_frontier(seq, len(wire) + 4,
hashlib.sha256(nam + wire).digest()[:20])
return seq
def write(self, content, sign_fct): # publish event, returns seq or None
assert os.path.getsize(self.log_fname) == self.state['max_pos']
sz = bipf.varint_encode_to_bytes(len(content))
root_content_length = 28 - len(sz)
root_content = content[:root_content_length] # the first bytes fit on root
# prep the chunks
content = content[root_content_length:] # skip over root_content
chunk_content_length = 100
# pad so all chunk content is size 100
i = len(content) % chunk_content_length
if i > 0:
pad = bytes(chunk_content_length - i)
content += pad
## we build the hash-chain of chunks from the end to the start
ptr = bytes(20) # end of chain is signalled by all-zero "hash"
chunks = []
while len(content) > 0:
buf = content[-chunk_content_length:] + ptr
chunks.append(buf)
ptr = hashlib.sha256(buf).digest()[:20]
content = content[:-chunk_content_length]
chunks.reverse()
# prep the root message
seq = self.state['max_seq'] + 1
nam = PFX + self.fid + seq.to_bytes(4,'big') + self.state['prev']
dmx = hashlib.sha256(nam).digest()[:7]
payload = sz + root_content + ptr
msg = dmx + bytes([PKTTYPE_chain20]) + payload
wire = msg + sign_fct(nam + msg)
assert len(wire) == 120
assert self.verify_fct(self.fid, wire[56:], nam + wire[:56])
# write to log
chunks.insert(0, wire)
log_entry = b''.join(chunks)
log_entry += self.state['max_pos'].to_bytes(4, 'big')
with open(self.log_fname, 'ab') as f:
f.write(log_entry)
# ??
self._persist_frontier(seq, self.state['max_pos'] + len(log_entry),
hashlib.sha256(nam + wire).digest()[:20])
return seq
# ----------------------------------------------------------------------
pass
# eof