Skip to content

Commit b44155b

Browse files
committed
check nbf/exp and add allowed_max_lifetime
1 parent 361ecaa commit b44155b

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

src/cryptojwt/jwt.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Basic JSON Web Token implementation."""
22
import json
33
import logging
4+
import time
45
import uuid
56
from datetime import datetime
67
from datetime import timezone
@@ -95,6 +96,7 @@ def __init__(
9596
allowed_sign_algs=None,
9697
allowed_enc_algs=None,
9798
allowed_enc_encs=None,
99+
allowed_max_lifetime=None,
98100
zip="",
99101
):
100102
self.key_jar = key_jar # KeyJar instance
@@ -115,6 +117,7 @@ def __init__(
115117
self.allowed_sign_algs = allowed_sign_algs
116118
self.allowed_enc_algs = allowed_enc_algs
117119
self.allowed_enc_encs = allowed_enc_encs
120+
self.allowed_max_lifetime = allowed_max_lifetime
118121
self.zip = zip
119122

120123
def receiver_keys(self, recv, use):
@@ -304,11 +307,12 @@ def verify_profile(msg_cls, info, **kwargs):
304307
raise VerificationError()
305308
return _msg
306309

307-
def unpack(self, token):
310+
def unpack(self, token, timestamp=None):
308311
"""
309312
Unpack a received signed or signed and encrypted Json Web Token
310313
311314
:param token: The Json Web Token
315+
:param t: Time for evaluation (default now)
312316
:return: If decryption and signature verification work the payload
313317
will be returned as a Message instance if possible.
314318
"""
@@ -378,6 +382,26 @@ def unpack(self, token):
378382
except KeyError:
379383
_msg_cls = None
380384

385+
timestamp = timestamp or time.time()
386+
387+
if "nbf" in _info:
388+
nbf = int(_info["nbf"])
389+
if timestamp < nbf - self.skew:
390+
raise VerificationError("Token not yet valid")
391+
392+
if "exp" in _info:
393+
exp = int(_info["exp"])
394+
if timestamp >= exp + self.skew:
395+
raise VerificationError("Token expired")
396+
else:
397+
exp = None
398+
399+
if "iat" in _info:
400+
iat = int(_info["iat"])
401+
if self.allowed_max_lifetime and exp:
402+
if abs(exp - iat) > self.allowed_max_lifetime:
403+
raise VerificationError("Token lifetime exceeded")
404+
381405
if _msg_cls:
382406
vp_args = {"skew": self.skew}
383407
if self.iss:

tests/test_09_jwt.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from cryptojwt.exception import IssuerNotFound
66
from cryptojwt.jws.exception import NoSuitableSigningKeys
77
from cryptojwt.jwt import JWT
8+
from cryptojwt.jwt import VerificationError, utc_time_sans_frac
89
from cryptojwt.jwt import pick_key
910
from cryptojwt.key_bundle import KeyBundle
1011
from cryptojwt.key_jar import KeyJar
@@ -81,6 +82,59 @@ def test_jwt_pack_and_unpack():
8182
assert set(info.keys()) == {"iat", "iss", "sub"}
8283

8384

85+
def test_jwt_pack_and_unpack_valid():
86+
alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256")
87+
t = utc_time_sans_frac()
88+
payload = {"sub": "sub", "nbf": t, "exp": t + 3600}
89+
_jwt = alice.pack(payload=payload)
90+
91+
bob = JWT(key_jar=BOB_KEY_JAR, iss=BOB, allowed_sign_algs=["RS256"])
92+
info = bob.unpack(_jwt)
93+
94+
assert set(info.keys()) == {"iat", "iss", "sub", "nbf", "exp"}
95+
96+
97+
def test_jwt_pack_and_unpack_not_yet_valid():
98+
lifetime = 3600
99+
skew = 15
100+
alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256", lifetime=lifetime)
101+
timestamp = utc_time_sans_frac()
102+
payload = {"sub": "sub", "nbf": timestamp}
103+
_jwt = alice.pack(payload=payload)
104+
105+
bob = JWT(key_jar=BOB_KEY_JAR, iss=BOB, allowed_sign_algs=["RS256"], skew=skew)
106+
_ = bob.unpack(_jwt, timestamp=timestamp - skew)
107+
with pytest.raises(VerificationError):
108+
_ = bob.unpack(_jwt, timestamp=timestamp - skew - 1)
109+
110+
111+
def test_jwt_pack_and_unpack_expired():
112+
lifetime = 3600
113+
skew = 15
114+
alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256", lifetime=lifetime)
115+
payload = {"sub": "sub"}
116+
_jwt = alice.pack(payload=payload)
117+
118+
bob = JWT(key_jar=BOB_KEY_JAR, iss=BOB, allowed_sign_algs=["RS256"], skew=skew)
119+
iat = bob.unpack(_jwt)["iat"]
120+
_ = bob.unpack(_jwt, timestamp=iat + lifetime + skew - 1)
121+
with pytest.raises(VerificationError):
122+
_ = bob.unpack(_jwt, timestamp=iat + lifetime + skew)
123+
124+
125+
def test_jwt_pack_and_unpack_max_lifetime_exceeded():
126+
lifetime = 3600
127+
alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256", lifetime=lifetime)
128+
payload = {"sub": "sub"}
129+
_jwt = alice.pack(payload=payload)
130+
131+
bob = JWT(
132+
key_jar=BOB_KEY_JAR, iss=BOB, allowed_sign_algs=["RS256"], allowed_max_lifetime=lifetime - 1
133+
)
134+
with pytest.raises(VerificationError):
135+
_ = bob.unpack(_jwt)
136+
137+
84138
def test_jwt_pack_and_unpack_unknown_issuer():
85139
alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256")
86140
payload = {"sub": "sub"}
@@ -261,4 +315,4 @@ def test_eddsa_jwt():
261315
kj = KeyJar()
262316
kj.add_kb(ISSUER, KeyBundle(JWKS_DICT))
263317
jwt = JWT(key_jar=kj)
264-
_ = jwt.unpack(JWT_TEST)
318+
_ = jwt.unpack(JWT_TEST, timestamp=1655278809)

0 commit comments

Comments
 (0)