1
1
import json
2
2
import logging
3
3
import os
4
- import re
5
4
import shutil
6
5
from abc import abstractmethod
7
6
from collections .abc import Iterable , Iterator
10
9
from tempfile import NamedTemporaryFile
11
10
from time import time
12
11
from typing import TYPE_CHECKING , Any , Optional
12
+ from urllib .parse import urlparse
13
13
14
14
import aiohttp
15
15
from aiohttp_retry import ExponentialRetry , RetryClient
20
20
21
21
from scmrepo .git .backend .dulwich import _get_ssh_vendor
22
22
from scmrepo .git .credentials import Credential , CredentialNotFoundError
23
+ from scmrepo .urls import SCP_REGEX , is_scp_style_url
23
24
24
25
from .exceptions import LFSError
25
26
from .pointer import Pointer
@@ -84,7 +85,7 @@ def loop(self):
84
85
85
86
@classmethod
86
87
def from_git_url (cls , git_url : str ) -> "LFSClient" :
87
- if git_url .startswith (( "ssh://" , "git@" ) ):
88
+ if git_url .startswith ("ssh://" ) or is_scp_style_url ( git_url ):
88
89
return _SSHLFSClient .from_git_url (git_url )
89
90
if git_url .startswith (("http://" , "https://" )):
90
91
return _HTTPLFSClient .from_git_url (git_url )
@@ -213,11 +214,9 @@ def _get_auth_header(self, *, upload: bool) -> dict:
213
214
214
215
215
216
class _SSHLFSClient (LFSClient ):
216
- _URL_PATTERN = re .compile (
217
- r"(?:ssh://)?git@(?P<host>\S+?)(?::(?P<port>\d+))?(?:[:/])(?P<path>\S+?)\.git"
218
- )
219
-
220
- def __init__ (self , url : str , host : str , port : int , path : str ):
217
+ def __init__ (
218
+ self , url : str , host : str , port : int , username : Optional [str ], path : str
219
+ ):
221
220
"""
222
221
Args:
223
222
url: LFS server URL.
@@ -228,33 +227,50 @@ def __init__(self, url: str, host: str, port: int, path: str):
228
227
super ().__init__ (url )
229
228
self .host = host
230
229
self .port = port
230
+ self .username = username
231
231
self .path = path
232
232
self ._ssh = _get_ssh_vendor ()
233
233
234
234
@classmethod
235
235
def from_git_url (cls , git_url : str ) -> "_SSHLFSClient" :
236
- result = cls ._URL_PATTERN .match (git_url )
237
- if not result :
236
+ if scp_match := SCP_REGEX .match (git_url ):
237
+ # Add an ssh:// prefix and replace the ':' with a '/'.
238
+ git_url = scp_match .expand (r"ssh://\1\2/\3" )
239
+
240
+ parsed = urlparse (git_url )
241
+ if parsed .scheme != "ssh" or not parsed .hostname :
238
242
raise ValueError (f"Invalid Git SSH URL: { git_url } " )
239
- host , port , path = result .group ("host" , "port" , "path" )
240
- url = f"https://{ host } /{ path } .git/info/lfs"
241
- return cls (url , host , int (port or 22 ), path )
243
+
244
+ host = parsed .hostname
245
+ port = parsed .port or 22
246
+ path = parsed .path .lstrip ("/" )
247
+ username = parsed .username
248
+
249
+ url_path = path .removesuffix (".git" ) + ".git/info/lfs"
250
+ url = f"https://{ host } /{ url_path } "
251
+ return cls (url , host , port , username , path )
242
252
243
253
def _get_auth_header (self , * , upload : bool ) -> dict :
244
254
return self ._git_lfs_authenticate (
245
- self .host , self .port , f" { self .path } .git" , upload = upload
255
+ self .host , self .port , self .username , self . path , upload = upload
246
256
).get ("header" , {})
247
257
248
258
def _git_lfs_authenticate (
249
- self , host : str , port : int , path : str , * , upload : bool = False
259
+ self ,
260
+ host : str ,
261
+ port : int ,
262
+ username : Optional [str ],
263
+ path : str ,
264
+ * ,
265
+ upload : bool = False ,
250
266
) -> dict :
251
267
action = "upload" if upload else "download"
252
268
return json .loads (
253
269
self ._ssh .run_command (
254
270
command = f"git-lfs-authenticate { path } { action } " ,
255
271
host = host ,
256
272
port = port ,
257
- username = "git" ,
273
+ username = username ,
258
274
).read ()
259
275
)
260
276
0 commit comments