rev: tip css/lib/python/cs/pop3.py -rw-r--r-- 19.0 KiB View raw Log this file
b64656001bb0Cameron Simpson cs.urlutils: URL.content_type_full: warn and return None if the COntent-Type header is missing a day ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
#!/usr/bin/env python3

''' POP3 stuff, particularly a streaming downloader and a simple command line which runs it.

    I spend some time on a geostationary satellite connection,
    where round trip ping times are over 600ms when things are good.

    My mail setup involves fetching messages from my inbox
    for local storage in my laptop, usually using POP3.
    The common standalone tools for this are `fetchmail` and `getmail`.
    However, both are very subject to the link latency,
    in that they request a message, collect it, issue a delete, then repeat.
    On a satellite link that incurs a cost of over a second per message,
    making catch up after a period offline a many minutes long exercise in tedium.

    This module does something I've been meaning to do for literally years:
    a bulk fetch. It issues `RETR`ieves for every message up front as fast as possible.
    A separate thread collects the messages as they are delivered
    and issues `DELE`tes for the saved messages as soon as each is saved.

    This results in a fetch process which is orders of magnitude faster.
    Even on a low latency link the throughput is much faster;
    on the satellite it is gobsmackingly faster.
'''

from collections import namedtuple
from contextlib import contextmanager
from email.parser import BytesParser
from mailbox import Maildir
from netrc import netrc
from os import geteuid
from os.path import isdir as isdirpath
from pwd import getpwuid
from socket import create_connection
import ssl
import sys
from threading import RLock

from cs.cmdutils import BaseCommand
from cs.fs import shortpath
from cs.lex import cutprefix, cutsuffix
from cs.logutils import debug, warning, error, exception
from cs.pfx import Pfx, pfx, pfx_call
from cs.queues import IterableQueue
from cs.resources import MultiOpenMixin
from cs.result import Result, ResultSet
from cs.threads import bg as bg_thread
from cs.upd import print

__version__ = '20240316-post'

DISTINFO = {
    'keywords': ["python3"],
    'classifiers': [
        "Programming Language :: Python",
        "Programming Language :: Python :: 3",
        "Environment :: Console",
        "Topic :: Communications :: Email :: Post-Office :: POP3",
        "Topic :: Internet",
        "Topic :: Utilities",
    ],
    'install_requires': [
        'cs.cmdutils>=20210407.1',
        'cs.fs',
        'cs.lex',
        'cs.logutils',
        'cs.pfx',
        'cs.queues',
        'cs.resources',
        'cs.result>=20210407',
        'cs.threads',
        'cs.upd',
    ],
    'entry_points': {
        'console_scripts': {
            'pop3': 'cs.pop3:main'
        },
    },
}

def main(argv=None):
  ''' The `pop3` command line mode.
  '''
  return POP3Command(argv).run()

class POP3(MultiOpenMixin):
  ''' Simple POP3 class with support for streaming use.
  '''

  def __init__(self, conn_spec):
    if isinstance(conn_spec, str):
      conn_spec = ConnectionSpec.from_spec(conn_spec)
    self.conn_spec = conn_spec
    self._result_queue = None
    self._client_worker = None
    self._sock = None
    self.recvf = None
    self.sendf = None
    self._lock = RLock()

  @contextmanager
  def startup_shutdown(self):
    ''' Connect to the server and log in.
    '''
    self._sock = self.conn_spec.connect()
    self.recvf = self._sock.makefile('r', encoding='iso8859-1')
    self.sendf = self._sock.makefile('w', encoding='ascii')
    self.client_begin()
    self.client_auth(self.conn_spec.user, self.conn_spec.password)
    self._result_queue = IterableQueue()
    self._client_worker = bg_thread(
        self._client_response_worker, args=(self._result_queue,)
    )
    try:
      yield
    finally:
      logmsg = debug
      logmsg("send client QUIT")
      try:
        quitR = self.client_quit_bg()
        logmsg("flush QUIT")
        self.flush()
        logmsg("join QUIT")
        quitR.join()
      except Exception as e:  # pylint: disable=broad-except
        exception("client quit: %s", e)
        logmsg = warning
      if self._result_queue:
        logmsg("close result queue")
        self._result_queue.close()
        self._result_queue = None
      if self._client_worker:
        logmsg("join client worker")
        self._client_worker.join()
        self._client_worker = None
      logmsg("close sendf")
      self.sendf.close()
      self.sendf = None
      logmsg("check for uncollected server responses")
      bs = self.recvf.read()
      if bs:
        warning("received %d bytes from the server at shutdown", len(bs))
      logmsg("close recvf")
      self.recvf.close()
      self.recvf = None
      logmsg("close socket")
      self._sock.close()
      self._sock = None
      logmsg("shutdown complete")

  def readline(self):
    ''' Read a CRLF terminated line from `self.recvf`.
        Return the text preceeding the CRLF.
        Return `None` at EOF.
    '''
    line0 = self.recvf.readline()
    if not line0:
      return None
    line = cutsuffix(line0, '\n')
    assert line is not line0, "missing LF: %r" % (line0,)
    line = cutsuffix(line, '\r')
    return line

  def readlines(self):
    ''' Generator yielding lines from `self.recf`.
    '''
    while True:
      line = self.readline()
      if line is None:
        break
      yield line

  def get_response(self):
    ''' Read a server response.
        Return `(ok,status,etc)`
        where `ok` is true if `status` is `'+OK'`, false otherwise;
        `status` is the status word
        and `etc` is the following text.
        Return `(None,None,None)` on EOF from the receive stream.
    '''
    line = self.readline()
    if line is None:
      return None, None, None
    try:
      status, etc = line.split(None, 1)
    except ValueError:
      status = line
      etc = ''
    return status == '+OK', status, etc

  def get_ok(self):
    ''' Read server response, require it to be `'OK+'`.
        Returns the `etc` part.
    '''
    ok, status, etc = self.get_response()
    if not ok:
      raise ValueError("no ok from server: %r %r" % (status, etc))
    return etc

  def get_multiline(self):
    ''' Generator yielding unstuffed lines from a multiline response.
    '''
    for line in self.readlines():
      if line == '.':
        break
      if line.startswith('.'):
        line = line[1:]
      yield line

  def flush(self):
    ''' Flush the send stream.
    '''
    self.sendf.flush()

  def sendline(self, line, do_flush=False):
    ''' Send a line (excluding its terminating CRLF).
        If `do_flush` is true (default `False`)
        also flush the sending stream.
    '''
    assert '\r' not in line and '\n' not in line
    self.sendf.write(line)
    self.sendf.write('\r\n')
    if do_flush:
      self.flush()

  def _client_response_worker(self, result_queue):
    ''' Worker to process queued request responses.
        Each completed response assigns `(etc,lines)` to the `Result`
        where `etc` is the addition text from the server ok response
        and `lines` is a list of the multiline part of the response
        or `None` if the response is not multiline.
    '''
    for R, is_multiline in result_queue:
      try:
        etc = self.get_ok()
        if is_multiline:
          lines = list(self.get_multiline())
        else:
          lines = None
      except Exception as e:  # pylint: disable=broad-except
        R.exc_info = sys.exc_info
        warning("%s: %s", R, e)
      else:
        # save a list so that we can erase it in a handler to release memory
        R.result = [etc, lines]

  def client_begin(self):
    ''' Read the opening server response.
    '''
    etc = self.get_ok()
    print(etc)

  def client_auth(self, user, password):
    ''' Perform a client authentication.
    '''
    self.sendline(f'USER {user}', do_flush=True)
    print('USER', user, self.get_ok())
    self.sendline(f'PASS {password}', do_flush=True)
    print('PASS', '****', self.get_ok())

  def client_uidl(self):
    ''' Return a mapping of message number to message UID string.
    '''
    self.sendline('UIDL', do_flush=True)
    self.get_ok()
    for line in self.get_multiline():
      n, msg_uid = line.split(None, 1)
      n = int(n)
      yield n, msg_uid

  def client_bg(self, rq_line, is_multiline=False, notify=None):
    ''' Dispatch a request `rq_line` in the background.
        Return a `Result` to collect the request result.

        Parameters:
        * `rq_line`: POP3 request text, without any terminating CRLF
        * `is_multiline`: true if a multiline response is expected,
          default `False`
        * `notify`: a optional handler for `Result.notify`,
          applied if not `None`

        *Note*: DOES NOT flush the send stream.
        Call `self.flush()` when a batch of requests has been submitted,
        before trying to collect the `Result`s.

        The `Result` will receive `[etc,lines]` on success
        where:
        * `etc` is the trailing portion of an ok response line
        * `lines` is a list of unstuffed text lines from the response
          if `is_multiline` is true, `None` otherwise
        The `Result` gets a list instead of a tuple
        so that a handler may clear it in order to release memory.

        Example:

            R = self.client_bg(f'RETR {msg_n}', is_multiline=True, notify=notify)
    '''
    with self._lock:
      self.sendline(rq_line)
      R = Result(rq_line)
      self._result_queue.put((R, is_multiline))
    R.extra.update(rq_line=rq_line)
    if notify is not None:
      R.notify(notify)
    return R

  def client_dele_bg(self, msg_n):
    ''' Queue a delete request for message `msg_n`,
        return ` Result` for collection.
    '''
    R = self.client_bg(f'DELE {msg_n}')
    R.extra.update(msg_n=msg_n)
    return R

  def client_quit_bg(self):
    ''' Queue a QUIT request.
        return ` Result` for collection.
    '''
    R = self.client_bg('QUIT')
    return R

  def client_retr_bg(self, msg_n, notify=None):
    ''' Queue a retrieve request for message `msg_n`,
        return ` Result` for collection.

        If `notify` is not `None`, apply it to the `Result`.
    '''
    R = self.client_bg(f'RETR {msg_n}', is_multiline=True, notify=notify)
    R.extra.update(msg_n=msg_n)
    return R

  def dl_bg(self, msg_n, maildir, deleRs):
    ''' Download message `msg_n` to Maildir `maildir`.
        Return the `Result` for the `RETR` request.

        After a successful save,
        queue a `DELE` for the message
        and add its `Result` to `deleRs`.
    '''

    def dl_bg_save_result(R):
      with Pfx("MSG %d", msg_n):
        _, lines = R.result
        R.result[1] = None  # release lines
        msg_bs = b''.join(
            map(lambda line: line.encode('iso8859-1') + b'\r\n', lines)
        )
        msg = BytesParser().parsebytes(msg_bs)
        hdr_from = str(msg.get('from', '<UNKNOWN>'))
        with Pfx("from %s", hdr_from):
          try:
            # mailbox.Maildir.add is not thread safe, serialise it
            with self._lock:
              Mkey = maildir.add(msg)
          except UnicodeEncodeError as e:
            error(
                "cannot save to %s, skipping DELE: %s",
                shortpath(maildir._path), e
            )
          else:
            print(
                f'msg {msg_n} from {hdr_from}: {len(msg_bs)} octets, saved as {Mkey}, deleting'
            )
            if deleRs is not None:
              deleRs.add(self.client_dele_bg(msg_n))

    R = self.client_retr_bg(msg_n, notify=dl_bg_save_result)
    return R

class NetrcEntry(namedtuple('NetrcEntry', 'machine login account password')):
  ''' A `namedtuple` representation of a `netrc` entry.
  '''

  NO_ENTRY = None, None, None

  @classmethod
  def get(cls, machine, netrc_hosts=None):
    ''' Look up an entry by the `machine` field value.
    '''
    if netrc_hosts is None:
      netrc_hosts = netrc().hosts
    entry = netrc_hosts.get(machine, cls.NO_ENTRY)
    return cls(machine, *entry)

  @classmethod
  def by_account(cls, account_name, netrc_hosts=None):
    ''' Look up an entry by the `account` field value.
    '''
    if netrc_hosts is None:
      netrc_hosts = netrc().hosts
    for machine, entry_tuple in netrc_hosts.items():
      if entry_tuple[1] == account_name:
        return cls(machine, *entry_tuple)
    return cls(None, *cls.NO_ENTRY)

class ConnectionSpec(namedtuple('ConnectionSpec',
                                'user host sni_host port ssl')):
  ''' A specification for a POP3 connection.
  '''

  # pylint: disable=too-many-branches
  @classmethod
  def from_spec(cls, spec):
    ''' Construct an instance from a connection spec string
        of the form [`tcp:`|`ssl:`][*user*`@`]*[tcp_host!]server_hostname*[`:`*port*].

        The optional prefixes `tcp:` and `ssl:` indicate that the connection
        should be cleartext or SSL/TLS respectively.
        The default is SSL/TLS.
    '''
    spec2 = cutprefix(spec, 'tcp:')
    if spec2 is not spec:
      spec = spec2
      use_ssl = False
    else:
      spec = cutprefix(spec, 'ssl:')
      use_ssl = True
    # see if what's left after the mode matches a netrc account name
    account_entry = NetrcEntry.by_account(spec)
    if account_entry.machine is None:
      account_entry = None
    else:
      # a match, use the machine name as the spec
      spec = account_entry.machine
    try:
      user, hostpart = spec.split('@', 1)
    except ValueError:
      # no user specified, use a default
      hostpart = spec
      current_user = getpwuid(geteuid()).pw_name
      if account_entry:
        if account_entry.login:
          user = account_entry.login
        else:
          # see if the account name has a user part
          try:
            user, _ = account_entry.account.split('@', 1)
          except ValueError:
            user = current_user
      else:
        user = current_user
    try:
      host, port = hostpart.split(':')
    except ValueError:
      host = hostpart
      port = 995 if use_ssl else 110
    else:
      port = int(port)
    try:
      tcp_host, sni_host = host.split('!', 1)
    except ValueError:
      # get the SNI name from the account name
      if account_entry:
        tcp_host = host
        try:
          _, sni_host = account_entry.account.split('@', 1)
        except ValueError:
          sni_host = account_entry.account
      else:
        tcp_host, sni_host = host, host
    conn_spec = cls(
        user=user, host=tcp_host, sni_host=sni_host, port=port, ssl=use_ssl
    )
    ##print("conn_spec =", conn_spec)
    return conn_spec

  @property
  def netrc_entry(self):
    ''' The default `NetrcEntry` for this `ConnectionSpec`.
    '''
    machine = f'{self.user}@{self.host}:{self.port}'
    return NetrcEntry.get(machine)

  @property
  def password(self):
    ''' The password for this connection, obtained from the `.netrc` file
        via the key *user*`@`*host*`:`*port*.
    '''
    entry = self.netrc_entry
    return entry.password

  def connect(self):
    ''' Connect according to this `ConnectionSpec`, return the `socket`.
    '''
    sock = pfx_call(create_connection, (self.host, self.port))
    if self.ssl:
      context = ssl.create_default_context()
      sock = context.wrap_socket(sock, server_hostname=self.sni_host)
      print("SSL:", sock.version())
    return sock

class POP3Command(BaseCommand):
  ''' Command line implementation for POP3 operations.

      Credentials are obtained via the `.netrc` file presently.

      Connection specifications consist of an optional leading mode prefix
      followed by a netrc(5) account name
      or an explicit connection specification
      from which to derive:
      * `user`: the user name to log in as
      * `tcp_host`: the hostname to which to establish a TCP connection
      * `port`: the TCP port to connect on, default 995 for TLS/SSL or 110 for cleartext
      * `sni_host`: the TLS/SSL SNI server name, which may be different from the `tcp_host`

      The optional mode prefix is one of:
      * `ssl:`: use TLS/SSL - this is the default
      * `tcp:`: use cleartext - this is useful for ssh port forwards
        to some not-publicly-exposed clear text POP service;
        in particular streaming performs better this way,
        I think because the Python SSL layer does not buffer writes

      Example connection specifications:
      * `username@mail.example.com`:
        use TLS/SSL to connect to the POP3S service at `mail.example.com`,
        logging in as `username`
      * `mail.example.com`:
        use TLS/SSL to connect to the POP3S service at `mail.example.com`,
        logging in with the same login as the local effective user
      * `tcp:username@localhost:1110`:
        use cleartext to connect to `localhost:1110`,
        typically an ssh port forward to a remote private cleartext POP service,
        logging in as `username`
      * `username@localhost!mail.example.com:1995`:
        use TLS/SSL to connect to `localhost:1995`,
        usually an ssh port forward to a remote private TLS/SSL POP service,
        logging in as `username` and passing `mail.exampl.com`
        as the TLS/SSL server name indication
        (which allows certificate verification to proceed correctly)

      Note that the specification may also be a `netrc` account name.
      If the specification matches such an account name
      then values are derived from the `netrc` entry.
      The entry's `machine` name becomes the TCP connection specification,
      the entry's `login` provides a default for the username,
      the entry's `account` host part provides the `sni_host`.

      Example `netrc` entry:

          machine username@localhost:1110
            account username@mail.example.com
            password ************

      Such an entry allows you to use the specification `tcp:username@mail.example.com`
      and obtain the remaining detail via the `netrc` entry.
  '''

  # pylint: disable=no-self-use,too-many-locals
  def cmd_dl(self, argv):
    ''' Collect messages from a POP3 server and deliver to a Maildir.

        Usage: {cmd} [-n] [{{ssl,tcp}}:]{{netrc_account|[user@]host[!sni_name][:port]}} maildir
    '''
    doit = True
    if argv and argv[0] == '-n':
      argv.pop(0)
      doit = False
    pop_target = argv.pop(0)
    maildir_path = argv.pop(0)
    if argv:
      raise GetoptError("extra arguments after maildir: %r", argv)
    with Pfx("maildir %r", maildir_path):
      if not isdirpath(maildir_path):
        raise GetoptError("not a directory")
      M = Maildir(maildir_path)
    with Pfx(pop_target):
      try:
        with POP3(pop_target) as pop3:
          msg_uid_map = dict(pop3.client_uidl())
          print(
              f'{len(msg_uid_map)} message',
              ('' if len(msg_uid_map) == 1 else 's'),
              ('.' if len(msg_uid_map) == 0 else ':'),
              sep=''
          )
          with ResultSet() as deleRs:
            with ResultSet() as retrRs:
              for msg_n in msg_uid_map.keys():
                retrRs.add(pop3.dl_bg(msg_n, M, deleRs if doit else None))
              pop3.flush()
              retrRs.wait()
            # now the deleRs are all queued
            pop3.flush()
            if deleRs:
              print("wait for DELEs...")
              deleRs.wait()
      except ConnectionRefusedError as e:
        error("connection refused: %s", e)
        return 1

if __name__ == '__main__':
  sys.exit(main(sys.argv))