Skip to content

Library

penvm.lib.base

Provide BaseObject for all PENVM classes that need its functionality.

BaseObject

Base object for PENVM classes.

Provides common support for oid (UU object id) and object-specific LoggerAdapter logger.

Source code in penvm/src/lib/penvm/lib/base.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class BaseObject:
    """Base object for PENVM classes.

    Provides common support for `oid` (UU object id) and
    object-specific `LoggerAdapter` logger."""

    name = "base"

    def __init__(self, oid: str, logger: "Logger"):
        """Initialize.

        Args:
            oid: Universally unique object id.
            logger: Logger to wrap with `LoggerAdapter`.
        """
        self.oid = oid or get_uuid()
        self.logger = LoggerAdapter(logger, {"self": self, "id": self.oid})

__init__(oid, logger)

Initialize.

Parameters:

Name Type Description Default
oid str

Universally unique object id.

required
logger Logger

Logger to wrap with LoggerAdapter.

required
Source code in penvm/src/lib/penvm/lib/base.py
36
37
38
39
40
41
42
43
44
def __init__(self, oid: str, logger: "Logger"):
    """Initialize.

    Args:
        oid: Universally unique object id.
        logger: Logger to wrap with `LoggerAdapter`.
    """
    self.oid = oid or get_uuid()
    self.logger = LoggerAdapter(logger, {"self": self, "id": self.oid})

penvm.lib.connection

Wrappers for low-level connections.

ClientConnection

Bases: Connection

Client-side message queue connection.

Source code in penvm/src/lib/penvm/lib/connection.py
627
628
629
630
631
632
633
634
635
636
class ClientConnection(Connection):
    """Client-side message queue connection."""

    def __init__(self, *args, **kwargs):
        """Initialize."""
        try:
            super().__init__(*args, **kwargs)
            tlogger = self.logger.enter()
        finally:
            tlogger.exit()

__init__(*args, **kwargs)

Initialize.

Source code in penvm/src/lib/penvm/lib/connection.py
630
631
632
633
634
635
636
def __init__(self, *args, **kwargs):
    """Initialize."""
    try:
        super().__init__(*args, **kwargs)
        tlogger = self.logger.enter()
    finally:
        tlogger.exit()

Connection

Bases: MessageConnection

Connection.

Provides the interface between the network socket and message queues (incoming, outgoing).

The headers of incoming and outgoing message are all augmented with the connection id.

Source code in penvm/src/lib/penvm/lib/connection.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
class Connection(MessageConnection):
    """Connection.

    Provides the interface between the network socket and message
    queues (incoming, outgoing).

    The headers of incoming and outgoing message are all augmented
    with the connection id."""

    def __init__(
        self,
        machine: "Machine",
        host: str,
        port: int,
        sslcontext: Union["ssl.SSLContext", None] = None,
        sock: Union["socket.socket", None] = None,
        onclose: Union[Callable, None] = None,
    ):
        """Initialize.

        Args:
            machine: Machine.
            host: Host address.
            port: Port.
            sslcontext: SSL Context.
            onclose: Function to call to when connection is closed.
        """
        try:
            super().__init__(host, port, sslcontext, sock)
            self.machine = machine
            self.onclose = onclose

            self.exit = False
            self.imq = RoutingQueue(put=self.imq_put)
            self.omq = MessageQueue()
            self.recvmsgs_th = None
            self.sendmsgs_th = None
        finally:
            pass

    def __repr__(self):
        return f"<Connection id={self.oid} machine={self.machine.oid} host={self.host} port={self.port}>"

    def close(self):
        """Close connection."""
        try:
            tlogger = self.logger.enter()
            super().close()
            self.exit = True
            tlogger.debug(f"onclose ({self.onclose})")
            if self.onclose:
                self.onclose(self.oid)
        except Exception as e:
            tlogger.debug(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def imq_put(
        self,
        msg: "Message",
    ):
        """Put message on IMQ.

        Args:
            msg: Message to enqueue.
        """
        self.machine.imq.put(msg)

    def recvmsgs(self):
        """Receive messages over connection.

        Loops while `self.exit` is `True`.
        """
        try:
            tlogger = self.logger.enter()

            while not self.exit:
                try:
                    tlogger.debug("recvmsgs waiting ...")
                    msg = self.recvmsg()

                    tlogger.debug("recvmsgs message received")
                    # patch header with `Connection.oid`
                    msg.header["connection-id"] = self.oid

                    self.imq.put(msg)

                    tlogger.debug("recvmsgs message put")
                except Exception as e:
                    tlogger.debug(f"EXCEPTION ({e})")
                    # raise
                    # TODO: close/cleanup should be elsewhere
                    self.close()
                    break
        finally:
            tlogger.exit()

    def sendmsgs(self):
        """Send message over connection.

        Loops while `self.exit` is `True`.
        """
        try:
            tlogger = self.logger.enter()

            while not self.exit:
                try:
                    tlogger.debug("sendmsgs waiting ...")
                    msg = self.omq.pop()

                    tlogger.debug("sendmsgs popped message")
                    msg.header["connection-id"] = self.oid
                    self.sendmsg(msg)

                    tlogger.debug("sendmsgs sent")
                except Exception as e:
                    tlogger.debug(f"EXCEPTION ({e})")
                    self.close()
                    raise
                    break
        finally:
            tlogger.exit()

    def start(self):
        """Start recv and send message handing."""
        try:
            tlogger = self.logger.enter()

            # TODO: fix to handle ssl situation
            if self.sock == None:
                self.connect()
                # raise Exception("no connection!!!")

            if not self.sendmsgs_th:
                try:
                    tlogger.debug("starting sendmsgs ...")

                    self.sendmsgs_th = Thread(target=self.sendmsgs)
                    self.sendmsgs_th.daemon = True
                    self.sendmsgs_th.start()
                except Exception as e:
                    tlogger.debug(f"starting sendmsgs EXCEPTION ({e})")
                    self.sendmsgs_th = None

            if not self.recvmsgs_th:
                try:
                    tlogger.debug("starting recvmsgs ...")

                    self.recvmsgs_th = Thread(target=self.recvmsgs)
                    self.recvmsgs_th.daemon = True
                    self.recvmsgs_th.start()
                except Exception as e:
                    tlogger.debug(f"starting recvmsgs EXCEPTION ({e})")
                    self.recvmsgs_th = None
        except Exception as e:
            tlogger.debug(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` object.
        """
        try:
            peer = self.sock.getpeername()
            return State(
                "connection",
                self.oid,
                {
                    "initial-connection": self.machine.connmgr.init_conn == self,
                    "host": self.host,
                    "port": self.port,
                    "peer-host": peer[0],
                    "peer-port": peer[1],
                    "nimq": self.imq.size(),
                    "nomq": self.omq.size(),
                    "ssl": self.sslcontext != None,
                },
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

__init__(machine, host, port, sslcontext=None, sock=None, onclose=None)

Initialize.

Parameters:

Name Type Description Default
machine Machine

Machine.

required
host str

Host address.

required
port int

Port.

required
sslcontext Union[SSLContext, None]

SSL Context.

None
onclose Union[Callable, None]

Function to call to when connection is closed.

None
Source code in penvm/src/lib/penvm/lib/connection.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
def __init__(
    self,
    machine: "Machine",
    host: str,
    port: int,
    sslcontext: Union["ssl.SSLContext", None] = None,
    sock: Union["socket.socket", None] = None,
    onclose: Union[Callable, None] = None,
):
    """Initialize.

    Args:
        machine: Machine.
        host: Host address.
        port: Port.
        sslcontext: SSL Context.
        onclose: Function to call to when connection is closed.
    """
    try:
        super().__init__(host, port, sslcontext, sock)
        self.machine = machine
        self.onclose = onclose

        self.exit = False
        self.imq = RoutingQueue(put=self.imq_put)
        self.omq = MessageQueue()
        self.recvmsgs_th = None
        self.sendmsgs_th = None
    finally:
        pass

close()

Close connection.

Source code in penvm/src/lib/penvm/lib/connection.py
485
486
487
488
489
490
491
492
493
494
495
496
497
def close(self):
    """Close connection."""
    try:
        tlogger = self.logger.enter()
        super().close()
        self.exit = True
        tlogger.debug(f"onclose ({self.onclose})")
        if self.onclose:
            self.onclose(self.oid)
    except Exception as e:
        tlogger.debug(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

imq_put(msg)

Put message on IMQ.

Parameters:

Name Type Description Default
msg Message

Message to enqueue.

required
Source code in penvm/src/lib/penvm/lib/connection.py
499
500
501
502
503
504
505
506
507
508
def imq_put(
    self,
    msg: "Message",
):
    """Put message on IMQ.

    Args:
        msg: Message to enqueue.
    """
    self.machine.imq.put(msg)

recvmsgs()

Receive messages over connection.

Loops while self.exit is True.

Source code in penvm/src/lib/penvm/lib/connection.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
def recvmsgs(self):
    """Receive messages over connection.

    Loops while `self.exit` is `True`.
    """
    try:
        tlogger = self.logger.enter()

        while not self.exit:
            try:
                tlogger.debug("recvmsgs waiting ...")
                msg = self.recvmsg()

                tlogger.debug("recvmsgs message received")
                # patch header with `Connection.oid`
                msg.header["connection-id"] = self.oid

                self.imq.put(msg)

                tlogger.debug("recvmsgs message put")
            except Exception as e:
                tlogger.debug(f"EXCEPTION ({e})")
                # raise
                # TODO: close/cleanup should be elsewhere
                self.close()
                break
    finally:
        tlogger.exit()

sendmsgs()

Send message over connection.

Loops while self.exit is True.

Source code in penvm/src/lib/penvm/lib/connection.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
def sendmsgs(self):
    """Send message over connection.

    Loops while `self.exit` is `True`.
    """
    try:
        tlogger = self.logger.enter()

        while not self.exit:
            try:
                tlogger.debug("sendmsgs waiting ...")
                msg = self.omq.pop()

                tlogger.debug("sendmsgs popped message")
                msg.header["connection-id"] = self.oid
                self.sendmsg(msg)

                tlogger.debug("sendmsgs sent")
            except Exception as e:
                tlogger.debug(f"EXCEPTION ({e})")
                self.close()
                raise
                break
    finally:
        tlogger.exit()

start()

Start recv and send message handing.

Source code in penvm/src/lib/penvm/lib/connection.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
def start(self):
    """Start recv and send message handing."""
    try:
        tlogger = self.logger.enter()

        # TODO: fix to handle ssl situation
        if self.sock == None:
            self.connect()
            # raise Exception("no connection!!!")

        if not self.sendmsgs_th:
            try:
                tlogger.debug("starting sendmsgs ...")

                self.sendmsgs_th = Thread(target=self.sendmsgs)
                self.sendmsgs_th.daemon = True
                self.sendmsgs_th.start()
            except Exception as e:
                tlogger.debug(f"starting sendmsgs EXCEPTION ({e})")
                self.sendmsgs_th = None

        if not self.recvmsgs_th:
            try:
                tlogger.debug("starting recvmsgs ...")

                self.recvmsgs_th = Thread(target=self.recvmsgs)
                self.recvmsgs_th.daemon = True
                self.recvmsgs_th.start()
            except Exception as e:
                tlogger.debug(f"starting recvmsgs EXCEPTION ({e})")
                self.recvmsgs_th = None
    except Exception as e:
        tlogger.debug(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

state()

Get object state.

Returns:

Type Description
State

State object.

Source code in penvm/src/lib/penvm/lib/connection.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` object.
    """
    try:
        peer = self.sock.getpeername()
        return State(
            "connection",
            self.oid,
            {
                "initial-connection": self.machine.connmgr.init_conn == self,
                "host": self.host,
                "port": self.port,
                "peer-host": peer[0],
                "peer-port": peer[1],
                "nimq": self.imq.size(),
                "nomq": self.omq.size(),
                "ssl": self.sslcontext != None,
            },
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

Listener

Bases: BaseObject

Listener side/socket.

Source code in penvm/src/lib/penvm/lib/connection.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class Listener(BaseObject):
    """Listener side/socket."""

    def __init__(
        self,
        machine: "Machine",
        host: str,
        port: int,
        sslcontext: Union["ssl.SSLContext", None] = None,
    ):
        try:
            super().__init__(None, logger)
            tlogger = self.logger.enter()

            self.machine = machine
            self.host = host
            self.port = port
            self.sslcontext = sslcontext

            self.lsock = None
            self.lhost = None
            self.lport = None
        finally:
            tlogger.exit()

    def accept(self):
        try:
            tlogger = self.logger.enter()
            tlogger.debug("accepting ...")

            sock, addr = self.lsock.accept()
            tlogger.debug(f"accepted from sock={sock} addr={addr}")

            if self.sslcontext:
                # TODO: ensure (failed/slow/non-ssl) ssl negotiation does not block
                tlogger.debug(f"setting up ssl {self.sslcontext=}...")
                sock = self.sslcontext.wrap_socket(sock, server_side=True)
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
                tlogger.debug(f"ssl socket ({sock=})")

            tlogger.debug("creating ServerConnection ...")

            return ServerConnection(self.machine, addr[0], addr[1], self.sslcontext, sock)
        except Exception as e:
            tlogger.debug(f"EXCEPTION ({e})")
            # allow timeout to percolate up
            raise
        finally:
            tlogger.exit()

    def is_listening(self):
        return self.lsock != None

    def listen(
        self,
        n: int = 1,
    ):
        """Set up to listen for connection.

        Args:
            n: Number of outstanding socket connection requests
                allowed.
        """
        try:
            tlogger = self.logger.enter()

            self.lsock = lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            lsock.bind((self.host, self.port))
            lsock.listen(n)
            self.lhost, self.lport = lsock.getsockname()
        finally:
            tlogger.exit()

    def settimeout(
        self,
        delay: int,
    ):
        """Set timeout.

        Args:
            delay: Seconds before timing out on idle connection.
        """
        self.lsock.settimeout(delay)

listen(n=1)

Set up to listen for connection.

Parameters:

Name Type Description Default
n int

Number of outstanding socket connection requests allowed.

1
Source code in penvm/src/lib/penvm/lib/connection.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def listen(
    self,
    n: int = 1,
):
    """Set up to listen for connection.

    Args:
        n: Number of outstanding socket connection requests
            allowed.
    """
    try:
        tlogger = self.logger.enter()

        self.lsock = lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        lsock.bind((self.host, self.port))
        lsock.listen(n)
        self.lhost, self.lport = lsock.getsockname()
    finally:
        tlogger.exit()

settimeout(delay)

Set timeout.

Parameters:

Name Type Description Default
delay int

Seconds before timing out on idle connection.

required
Source code in penvm/src/lib/penvm/lib/connection.py
135
136
137
138
139
140
141
142
143
144
def settimeout(
    self,
    delay: int,
):
    """Set timeout.

    Args:
        delay: Seconds before timing out on idle connection.
    """
    self.lsock.settimeout(delay)

MessageConnection

Bases: SocketConnection

Message connection.

Provides the interface to the socket with support for messages.

The headers of incoming and outgoing message are all updated with the connection id.

Source code in penvm/src/lib/penvm/lib/connection.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
class MessageConnection(SocketConnection):
    """Message connection.

    Provides the interface to the socket with support for messages.

    The headers of incoming and outgoing message are all updated with
    the connection id."""

    def __init__(
        self,
        host: str,
        port: int,
        sslcontext: Union["ssl.SSLContext", None] = None,
        sock: Union["socket.socket", None] = None,
    ):
        """Setup.

        Args:
            host: Host address.
            port: Port.
            sslcontext: SSL context used for wrapping a regular
                socket to provide encryption.
            sock: Network socket.
        """
        try:
            super().__init__(host, port, sslcontext, sock)

            # self.logger already set up
            tlogger = self.logger.enter()
        finally:
            tlogger.exit()

    def __repr__(self):
        return f"<MessageConnection id={self.oid} host={self.host} port={self.port}>"

    def recvmsg(self) -> "Message":
        """Receive message and return (as generic Message).

        Returns:
            Received message.
        """
        try:
            tlogger = self.logger.enter()

            b = self.recvblk()
            t0 = time.time()
            h, p = Message.decode(b)
            elapsed = time.time() - t0
            tlogger.lap(f"size ({len(b)}) decode time ({elapsed}:.4f)")
            # print(f"size ({len(b)}) decode time ({elapsed}:.4f)")
            return Message(h, p)
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
            raise
        finally:
            tlogger.exit()

    def sendmsg(
        self,
        msg: "Message",
    ):
        """Send message (serialized).

        Args:
            msg: Message to send.
        """
        try:
            tlogger = self.logger.enter()

            t0 = time.time()
            b = msg.encode()
            elapsed = time.time() - t0
            tlogger.lap(f"size ({len(b)}) encode time ({elapsed}:.4f)")
            self.sendblk(b)
            # print(f"size ({len(b)}) encode time ({elapsed}:.4f)")
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
            raise
        finally:
            tlogger.exit()

__init__(host, port, sslcontext=None, sock=None)

Setup.

Parameters:

Name Type Description Default
host str

Host address.

required
port int

Port.

required
sslcontext Union[SSLContext, None]

SSL context used for wrapping a regular socket to provide encryption.

None
sock Union[socket, None]

Network socket.

None
Source code in penvm/src/lib/penvm/lib/connection.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def __init__(
    self,
    host: str,
    port: int,
    sslcontext: Union["ssl.SSLContext", None] = None,
    sock: Union["socket.socket", None] = None,
):
    """Setup.

    Args:
        host: Host address.
        port: Port.
        sslcontext: SSL context used for wrapping a regular
            socket to provide encryption.
        sock: Network socket.
    """
    try:
        super().__init__(host, port, sslcontext, sock)

        # self.logger already set up
        tlogger = self.logger.enter()
    finally:
        tlogger.exit()

recvmsg()

Receive message and return (as generic Message).

Returns:

Type Description
Message

Received message.

Source code in penvm/src/lib/penvm/lib/connection.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def recvmsg(self) -> "Message":
    """Receive message and return (as generic Message).

    Returns:
        Received message.
    """
    try:
        tlogger = self.logger.enter()

        b = self.recvblk()
        t0 = time.time()
        h, p = Message.decode(b)
        elapsed = time.time() - t0
        tlogger.lap(f"size ({len(b)}) decode time ({elapsed}:.4f)")
        # print(f"size ({len(b)}) decode time ({elapsed}:.4f)")
        return Message(h, p)
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
        raise
    finally:
        tlogger.exit()

sendmsg(msg)

Send message (serialized).

Parameters:

Name Type Description Default
msg Message

Message to send.

required
Source code in penvm/src/lib/penvm/lib/connection.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def sendmsg(
    self,
    msg: "Message",
):
    """Send message (serialized).

    Args:
        msg: Message to send.
    """
    try:
        tlogger = self.logger.enter()

        t0 = time.time()
        b = msg.encode()
        elapsed = time.time() - t0
        tlogger.lap(f"size ({len(b)}) encode time ({elapsed}:.4f)")
        self.sendblk(b)
        # print(f"size ({len(b)}) encode time ({elapsed}:.4f)")
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
        raise
    finally:
        tlogger.exit()

ServerConnection

Bases: Connection

Server-side message queue connection.

Returned by Listener.

Source code in penvm/src/lib/penvm/lib/connection.py
639
640
641
642
643
644
645
646
647
648
649
650
class ServerConnection(Connection):
    """Server-side message queue connection.

    Returned by Listener."""

    def __init__(self, *args, **kwargs):
        """Initialize."""
        try:
            super().__init__(*args, **kwargs)
            tlogger = self.logger.enter()
        finally:
            tlogger.exit()

__init__(*args, **kwargs)

Initialize.

Source code in penvm/src/lib/penvm/lib/connection.py
644
645
646
647
648
649
650
def __init__(self, *args, **kwargs):
    """Initialize."""
    try:
        super().__init__(*args, **kwargs)
        tlogger = self.logger.enter()
    finally:
        tlogger.exit()

SocketConnection

Bases: BaseObject

Socket connection.

Provides the interface to the socket.

Source code in penvm/src/lib/penvm/lib/connection.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class SocketConnection(BaseObject):
    """Socket connection.

    Provides the interface to the socket."""

    def __init__(
        self,
        host: str,
        port: int,
        sslcontext: Union["ssl.SSLContext", None] = None,
        sock: Union["socket.socket", None] = None,
    ):
        """Setup.

        Args:
            host: Host address.
            port: Port.
            sslcontext: SSL context used for wrapping a regular socket
                to provide encryption.
            sock: Network socket.
        """
        try:
            super().__init__(None, logger)
            tlogger = self.logger.enter()

            self.host = host
            self.port = port
            self.sslcontext = sslcontext
            self.sock = sock
        finally:
            tlogger.exit()

    def __repr__(self):
        return f"<SocketConnection id={self.oid} host={self.host} port={self.port}>"

    def connect(self):
        """Connection to server."""
        try:
            tlogger = self.logger.enter()

            try:
                sock = None
                for i in range(5):
                    try:
                        tlogger.debug(f"connecting to server ({self.host}) ({self.port}) ...")
                        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        if self.sslcontext:
                            sock = self.sslcontext.wrap_socket(sock, server_side=False)
                            tlogger.debug(f"socket ssl wrapped")
                        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

                        tlogger.debug(f"connection info host={self.host} port={self.port}")
                        sock.connect((self.host, self.port))
                        break
                    except socket.error as e:
                        # TODO: clean up sock if/as needed
                        tlogger.debug(f"socket error ({e})")
                        time.sleep(2)
                    except Exception as e:
                        if i == 5:
                            raise

                tlogger.debug("connected")
                self.sock = sock
            except Exception as e:
                tlogger.debug(f"connection failed ({e})")
        finally:
            tlogger.exit()

    def recv(
        self,
        sz: int,
    ) -> bytes:
        """Receive (all) bytes.

        Args:
            sz: Number of bytes.

        Returns:
            Bytes received.
        """
        l = []
        while sz > 0:
            b = self.sock.recv(sz)
            # TODO: handle signal? not needd since v3.5!
            if len(b) == 0:
                self.logger.debug(f"connection closed ({sz=}) ({l=})")
                raise Exception("connection closed")
                break
            # self.logger.debug(f"recv {len(b)=}")
            l.append(b)
            sz -= len(b)
        # TODO: improve this!?!
        return b"".join(l)

    def recvblk(
        self,
        blkszlen: int = BLK_SZ_LEN,
    ) -> bytes:
        """Receive and return a block.

        Args:
            blkszlen: Maximum receivable block size.

        Returns:
            Bytes received.
        """
        try:
            tlogger = self.logger.enter()

            try:
                sz = int(self.recv(blkszlen))
                t0 = time.time()
                b = self.recv(sz)
                elapsed = time.time() - t0
                if DEBUG:
                    ddrecv.writebytes(b)
                tlogger.lap(f"size ({len(b)}) perf ({len(b)/MB/elapsed:.4f} MB/s)")
                return b

            except Exception as e:
                # traceback.print_exc()
                raise ConnectionError("failed to receive block")
        finally:
            tlogger.exit()

    def xsend(
        self,
        b: bytes,
    ):
        """Send all bytes."""
        self.sock.sendall(b)

    def send(
        self,
        b: bytes,
    ):
        """Send bytes.

        Args:
            b: Bytes to send.
        """
        total = 0
        sz = len(b)
        while total < sz:
            count = self.sock.send(b)
            if count == 0:
                raise Exception("send failed")
            total += count
            b = b[count:]

        # print(f"------------ send ({total=}) ({sz=})")
        self.logger.debug(f"------------ send ({total=}) ({sz=})")
        return total

    def sendblk(
        self,
        b: bytes,
        blkszmax: int = BLK_SZ_MAX,
        blkszfmt: int = BLK_SZ_FMT,
    ):
        """Send a block (as bytes).

        Size information is sent over the stream before the data.

        Args:
            b: Bytes to send.
            blkszmax: Maximum sendable block size.
            blkszfmt: Block size field format.
        """
        try:
            tlogger = self.logger.enter()
            try:
                if len(b) > blkszmax:
                    raise Exception(f"block exceeds size ({blkszmax})")

                b = (blkszfmt % len(b)) + b
                t0 = time.time()
                self.send(b)
                elapsed = time.time() - t0
                if DEBUG:
                    ddsend.writebytes(b)
                # print(f"size ({len(b)}) perf ({len(b)/MB/elapsed:.4f} MB/s)")
                tlogger.lap(f"size ({len(b)}) perf ({len(b)/MB/elapsed:.4f} MB/s)")
            except Exception as e:
                traceback.print_exc()
                raise ConnectionError("failed to send block")
        finally:
            tlogger.exit()

    def close(self):
        """Close connection."""
        self.logger.debug("close")
        self.sock.close()
        self.sock = None

    def is_alive(self) -> bool:
        """Indicate if connection is alive or not.

        Returns:
            Alive status.
        """
        return True

    def is_connected(self) -> bool:
        """Indicate if connected or not.

        Returns:
            Connection status.
        """
        return self.sock != None

__init__(host, port, sslcontext=None, sock=None)

Setup.

Parameters:

Name Type Description Default
host str

Host address.

required
port int

Port.

required
sslcontext Union[SSLContext, None]

SSL context used for wrapping a regular socket to provide encryption.

None
sock Union[socket, None]

Network socket.

None
Source code in penvm/src/lib/penvm/lib/connection.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def __init__(
    self,
    host: str,
    port: int,
    sslcontext: Union["ssl.SSLContext", None] = None,
    sock: Union["socket.socket", None] = None,
):
    """Setup.

    Args:
        host: Host address.
        port: Port.
        sslcontext: SSL context used for wrapping a regular socket
            to provide encryption.
        sock: Network socket.
    """
    try:
        super().__init__(None, logger)
        tlogger = self.logger.enter()

        self.host = host
        self.port = port
        self.sslcontext = sslcontext
        self.sock = sock
    finally:
        tlogger.exit()

close()

Close connection.

Source code in penvm/src/lib/penvm/lib/connection.py
337
338
339
340
341
def close(self):
    """Close connection."""
    self.logger.debug("close")
    self.sock.close()
    self.sock = None

connect()

Connection to server.

Source code in penvm/src/lib/penvm/lib/connection.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def connect(self):
    """Connection to server."""
    try:
        tlogger = self.logger.enter()

        try:
            sock = None
            for i in range(5):
                try:
                    tlogger.debug(f"connecting to server ({self.host}) ({self.port}) ...")
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    if self.sslcontext:
                        sock = self.sslcontext.wrap_socket(sock, server_side=False)
                        tlogger.debug(f"socket ssl wrapped")
                    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

                    tlogger.debug(f"connection info host={self.host} port={self.port}")
                    sock.connect((self.host, self.port))
                    break
                except socket.error as e:
                    # TODO: clean up sock if/as needed
                    tlogger.debug(f"socket error ({e})")
                    time.sleep(2)
                except Exception as e:
                    if i == 5:
                        raise

            tlogger.debug("connected")
            self.sock = sock
        except Exception as e:
            tlogger.debug(f"connection failed ({e})")
    finally:
        tlogger.exit()

is_alive()

Indicate if connection is alive or not.

Returns:

Type Description
bool

Alive status.

Source code in penvm/src/lib/penvm/lib/connection.py
343
344
345
346
347
348
349
def is_alive(self) -> bool:
    """Indicate if connection is alive or not.

    Returns:
        Alive status.
    """
    return True

is_connected()

Indicate if connected or not.

Returns:

Type Description
bool

Connection status.

Source code in penvm/src/lib/penvm/lib/connection.py
351
352
353
354
355
356
357
def is_connected(self) -> bool:
    """Indicate if connected or not.

    Returns:
        Connection status.
    """
    return self.sock != None

recv(sz)

Receive (all) bytes.

Parameters:

Name Type Description Default
sz int

Number of bytes.

required

Returns:

Type Description
bytes

Bytes received.

Source code in penvm/src/lib/penvm/lib/connection.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def recv(
    self,
    sz: int,
) -> bytes:
    """Receive (all) bytes.

    Args:
        sz: Number of bytes.

    Returns:
        Bytes received.
    """
    l = []
    while sz > 0:
        b = self.sock.recv(sz)
        # TODO: handle signal? not needd since v3.5!
        if len(b) == 0:
            self.logger.debug(f"connection closed ({sz=}) ({l=})")
            raise Exception("connection closed")
            break
        # self.logger.debug(f"recv {len(b)=}")
        l.append(b)
        sz -= len(b)
    # TODO: improve this!?!
    return b"".join(l)

recvblk(blkszlen=BLK_SZ_LEN)

Receive and return a block.

Parameters:

Name Type Description Default
blkszlen int

Maximum receivable block size.

BLK_SZ_LEN

Returns:

Type Description
bytes

Bytes received.

Source code in penvm/src/lib/penvm/lib/connection.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def recvblk(
    self,
    blkszlen: int = BLK_SZ_LEN,
) -> bytes:
    """Receive and return a block.

    Args:
        blkszlen: Maximum receivable block size.

    Returns:
        Bytes received.
    """
    try:
        tlogger = self.logger.enter()

        try:
            sz = int(self.recv(blkszlen))
            t0 = time.time()
            b = self.recv(sz)
            elapsed = time.time() - t0
            if DEBUG:
                ddrecv.writebytes(b)
            tlogger.lap(f"size ({len(b)}) perf ({len(b)/MB/elapsed:.4f} MB/s)")
            return b

        except Exception as e:
            # traceback.print_exc()
            raise ConnectionError("failed to receive block")
    finally:
        tlogger.exit()

send(b)

Send bytes.

Parameters:

Name Type Description Default
b bytes

Bytes to send.

required
Source code in penvm/src/lib/penvm/lib/connection.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def send(
    self,
    b: bytes,
):
    """Send bytes.

    Args:
        b: Bytes to send.
    """
    total = 0
    sz = len(b)
    while total < sz:
        count = self.sock.send(b)
        if count == 0:
            raise Exception("send failed")
        total += count
        b = b[count:]

    # print(f"------------ send ({total=}) ({sz=})")
    self.logger.debug(f"------------ send ({total=}) ({sz=})")
    return total

sendblk(b, blkszmax=BLK_SZ_MAX, blkszfmt=BLK_SZ_FMT)

Send a block (as bytes).

Size information is sent over the stream before the data.

Parameters:

Name Type Description Default
b bytes

Bytes to send.

required
blkszmax int

Maximum sendable block size.

BLK_SZ_MAX
blkszfmt int

Block size field format.

BLK_SZ_FMT
Source code in penvm/src/lib/penvm/lib/connection.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def sendblk(
    self,
    b: bytes,
    blkszmax: int = BLK_SZ_MAX,
    blkszfmt: int = BLK_SZ_FMT,
):
    """Send a block (as bytes).

    Size information is sent over the stream before the data.

    Args:
        b: Bytes to send.
        blkszmax: Maximum sendable block size.
        blkszfmt: Block size field format.
    """
    try:
        tlogger = self.logger.enter()
        try:
            if len(b) > blkszmax:
                raise Exception(f"block exceeds size ({blkszmax})")

            b = (blkszfmt % len(b)) + b
            t0 = time.time()
            self.send(b)
            elapsed = time.time() - t0
            if DEBUG:
                ddsend.writebytes(b)
            # print(f"size ({len(b)}) perf ({len(b)/MB/elapsed:.4f} MB/s)")
            tlogger.lap(f"size ({len(b)}) perf ({len(b)/MB/elapsed:.4f} MB/s)")
        except Exception as e:
            traceback.print_exc()
            raise ConnectionError("failed to send block")
    finally:
        tlogger.exit()

xsend(b)

Send all bytes.

Source code in penvm/src/lib/penvm/lib/connection.py
273
274
275
276
277
278
def xsend(
    self,
    b: bytes,
):
    """Send all bytes."""
    self.sock.sendall(b)

penvm.lib.connectionmanager

ConnectionManager

Bases: BaseObject

Manage connections set up by Listener.

  • Listen for new connections (running in a thread).
  • Spawn accepted connections (Connection runs its own threads).
Source code in penvm/src/lib/penvm/lib/connectionmanager.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class ConnectionManager(BaseObject):
    """Manage connections set up by Listener.

    * Listen for new connections (running in a thread).
    * Spawn accepted connections (`Connection` runs its own threads).
    """

    def __init__(
        self,
        machine: "Machine",
        host: str,
        port: int,
        sslcontext: Union["ssl.SSLContext", None] = None,
    ):
        """Set up.

        Args:
            machine: Machine owning this manager.
            host: Host address to listen on. 0.0.0.0 for all
                interfaces.
            port: Port to listen on. 0 for auto assign.
            sslcontext: SSL context for SSL encrypted connections.
        """
        try:
            super().__init__(None, logger)
            tlogger = self.logger.enter()

            self.machine = machine
            self.host = host
            self.port = port
            self.sslcontext = sslcontext

            self.conns = {}
            self.exit = False
            self.listener = Listener(self.machine, host, port, sslcontext)
            # TODO: should listen() be called here?
            self.listener.listen(100)
            self.ltimeout = 30
            self.th = None
            self.init_conn = None
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def drop(
        self,
        connectionid: str,
    ):
        """Drop connection by connection id.

        Args:
            connectionid: Connection id.
        """
        self.logger.debug(f"DROPPED connection {connectionid=} {self.conns.get(connectionid)=}")
        self.conns.pop(connectionid)

    def get(
        self,
        connectionid: str,
    ) -> "Connection":
        """Get connection by connection id.

        Args:
            connectionid: Connection id.

        Returns:
            Connection for connection id.
        """
        return self.conns.get(connectionid)

    def list(self) -> List[str]:
        """Get list of connection ids.

        Returns:
            Connection ids.
        """
        return list(self.conns.keys())

    def run(self):
        """Run.

        Listen for connections and add.

        The initial connection must occur within a short amount of
        time (first-wait) and remain up for the lifetime of the
        machine. Once the initial connection drops, all others are
        dropped and the machine will end up shutting down.
        """

        try:
            tlogger = self.logger.enter()

            CHECK_TIMEOUT = 10.125
            CHECK_TIMEOUT = 120.125
            CHECK_TIMEOUT = 10000000

            try:
                # initial/firstwait timeout
                self.listener.settimeout(self.ltimeout)
                while not self.exit:
                    try:
                        tlogger.debug(
                            f"listening for connection {self.listener.lhost=} {self.listener.lport=} ..."
                        )
                        conn = self.listener.accept()
                        tlogger.debug(f"accepted connection {conn=}")

                        if self.init_conn == None:
                            # set to shutdown on connection close/fail of initial connection
                            self.init_conn = conn
                            conn.onclose = self.shutdown
                        else:
                            # TODO: should be part of accept step?
                            conn.onclose = self.drop

                        self.conns[conn.oid] = conn
                        conn.start()

                        # non-initial/post-firstwait update to timeout
                        if self.ltimeout >= 0 and self.ltimeout != CHECK_TIMEOUT:
                            tlogger.debug("non-intial timeout")
                            self.ltimeout = CHECK_TIMEOUT
                            self.listener.settimeout(self.ltimeout)
                    except (socket.timeout, TimeoutError):
                        # periodic wakeup
                        self.logger.debug(f"listener timed out after {self.ltimeout}")

                        if len(self.conns) == 0 and self.ltimeout == CHECK_TIMEOUT:
                            # cleanup opportunity
                            break

                        # setup check timeout if not already in place
                        if self.ltimeout != CHECK_TIMEOUT:
                            self.ltimeout = CHECK_TIMEOUT
                            self.listener.settimeout(self.ltimeout)
                    except Exception as e:
                        self.logger.debug(f"EXCEPTION ({e})")
            finally:
                # cleanup
                for conn in self.conns.values():
                    conn.close()
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def shutdown(
        self,
        connectionid: str,
    ):
        """Force termination.

        Intended to be called by `Connection.onclose` instead of
        `ConnectionManager.drop()` when the initial connection
        (`init_conn`) is closed.

        Args:
            connectionid: Connection id.
        """
        try:
            tlogger = self.logger.enter()
            self.machine.shutdown(now=True)
            # should not reach here!
        finally:
            tlogger.exit()

    def start(self):
        """Start.

        Background thread to handle connections."""
        try:
            self.logger.debug("starting ...")

            if not self.th:
                try:
                    self.th = threading.Thread(target=self.run)
                    self.th.daemon = True
                    self.th.start()
                except Exception as e:
                    self.logger.critical(f"failed to start ({e}")
                    self.th = None
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` object.
        """
        try:
            return State(
                "connection-manager",
                None,
                {
                    "connection-ids": self.list(),
                    "nconnections": len(self.conns),
                    "listener": {
                        "timeout": self.ltimeout,
                        "addr": self.listener.lhost,
                        "port": self.listener.lport,
                    },
                },
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

__init__(machine, host, port, sslcontext=None)

Set up.

Parameters:

Name Type Description Default
machine Machine

Machine owning this manager.

required
host str

Host address to listen on. 0.0.0.0 for all interfaces.

required
port int

Port to listen on. 0 for auto assign.

required
sslcontext Union[SSLContext, None]

SSL context for SSL encrypted connections.

None
Source code in penvm/src/lib/penvm/lib/connectionmanager.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def __init__(
    self,
    machine: "Machine",
    host: str,
    port: int,
    sslcontext: Union["ssl.SSLContext", None] = None,
):
    """Set up.

    Args:
        machine: Machine owning this manager.
        host: Host address to listen on. 0.0.0.0 for all
            interfaces.
        port: Port to listen on. 0 for auto assign.
        sslcontext: SSL context for SSL encrypted connections.
    """
    try:
        super().__init__(None, logger)
        tlogger = self.logger.enter()

        self.machine = machine
        self.host = host
        self.port = port
        self.sslcontext = sslcontext

        self.conns = {}
        self.exit = False
        self.listener = Listener(self.machine, host, port, sslcontext)
        # TODO: should listen() be called here?
        self.listener.listen(100)
        self.ltimeout = 30
        self.th = None
        self.init_conn = None
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

drop(connectionid)

Drop connection by connection id.

Parameters:

Name Type Description Default
connectionid str

Connection id.

required
Source code in penvm/src/lib/penvm/lib/connectionmanager.py
78
79
80
81
82
83
84
85
86
87
88
def drop(
    self,
    connectionid: str,
):
    """Drop connection by connection id.

    Args:
        connectionid: Connection id.
    """
    self.logger.debug(f"DROPPED connection {connectionid=} {self.conns.get(connectionid)=}")
    self.conns.pop(connectionid)

get(connectionid)

Get connection by connection id.

Parameters:

Name Type Description Default
connectionid str

Connection id.

required

Returns:

Type Description
Connection

Connection for connection id.

Source code in penvm/src/lib/penvm/lib/connectionmanager.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get(
    self,
    connectionid: str,
) -> "Connection":
    """Get connection by connection id.

    Args:
        connectionid: Connection id.

    Returns:
        Connection for connection id.
    """
    return self.conns.get(connectionid)

list()

Get list of connection ids.

Returns:

Type Description
List[str]

Connection ids.

Source code in penvm/src/lib/penvm/lib/connectionmanager.py
104
105
106
107
108
109
110
def list(self) -> List[str]:
    """Get list of connection ids.

    Returns:
        Connection ids.
    """
    return list(self.conns.keys())

run()

Run.

Listen for connections and add.

The initial connection must occur within a short amount of time (first-wait) and remain up for the lifetime of the machine. Once the initial connection drops, all others are dropped and the machine will end up shutting down.

Source code in penvm/src/lib/penvm/lib/connectionmanager.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def run(self):
    """Run.

    Listen for connections and add.

    The initial connection must occur within a short amount of
    time (first-wait) and remain up for the lifetime of the
    machine. Once the initial connection drops, all others are
    dropped and the machine will end up shutting down.
    """

    try:
        tlogger = self.logger.enter()

        CHECK_TIMEOUT = 10.125
        CHECK_TIMEOUT = 120.125
        CHECK_TIMEOUT = 10000000

        try:
            # initial/firstwait timeout
            self.listener.settimeout(self.ltimeout)
            while not self.exit:
                try:
                    tlogger.debug(
                        f"listening for connection {self.listener.lhost=} {self.listener.lport=} ..."
                    )
                    conn = self.listener.accept()
                    tlogger.debug(f"accepted connection {conn=}")

                    if self.init_conn == None:
                        # set to shutdown on connection close/fail of initial connection
                        self.init_conn = conn
                        conn.onclose = self.shutdown
                    else:
                        # TODO: should be part of accept step?
                        conn.onclose = self.drop

                    self.conns[conn.oid] = conn
                    conn.start()

                    # non-initial/post-firstwait update to timeout
                    if self.ltimeout >= 0 and self.ltimeout != CHECK_TIMEOUT:
                        tlogger.debug("non-intial timeout")
                        self.ltimeout = CHECK_TIMEOUT
                        self.listener.settimeout(self.ltimeout)
                except (socket.timeout, TimeoutError):
                    # periodic wakeup
                    self.logger.debug(f"listener timed out after {self.ltimeout}")

                    if len(self.conns) == 0 and self.ltimeout == CHECK_TIMEOUT:
                        # cleanup opportunity
                        break

                    # setup check timeout if not already in place
                    if self.ltimeout != CHECK_TIMEOUT:
                        self.ltimeout = CHECK_TIMEOUT
                        self.listener.settimeout(self.ltimeout)
                except Exception as e:
                    self.logger.debug(f"EXCEPTION ({e})")
        finally:
            # cleanup
            for conn in self.conns.values():
                conn.close()
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

shutdown(connectionid)

Force termination.

Intended to be called by Connection.onclose instead of ConnectionManager.drop() when the initial connection (init_conn) is closed.

Parameters:

Name Type Description Default
connectionid str

Connection id.

required
Source code in penvm/src/lib/penvm/lib/connectionmanager.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def shutdown(
    self,
    connectionid: str,
):
    """Force termination.

    Intended to be called by `Connection.onclose` instead of
    `ConnectionManager.drop()` when the initial connection
    (`init_conn`) is closed.

    Args:
        connectionid: Connection id.
    """
    try:
        tlogger = self.logger.enter()
        self.machine.shutdown(now=True)
        # should not reach here!
    finally:
        tlogger.exit()

start()

Start.

Background thread to handle connections.

Source code in penvm/src/lib/penvm/lib/connectionmanager.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def start(self):
    """Start.

    Background thread to handle connections."""
    try:
        self.logger.debug("starting ...")

        if not self.th:
            try:
                self.th = threading.Thread(target=self.run)
                self.th.daemon = True
                self.th.start()
            except Exception as e:
                self.logger.critical(f"failed to start ({e}")
                self.th = None
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

state()

Get object state.

Returns:

Type Description
State

State object.

Source code in penvm/src/lib/penvm/lib/connectionmanager.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` object.
    """
    try:
        return State(
            "connection-manager",
            None,
            {
                "connection-ids": self.list(),
                "nconnections": len(self.conns),
                "listener": {
                    "timeout": self.ltimeout,
                    "addr": self.listener.lhost,
                    "port": self.listener.lport,
                },
            },
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

penvm.lib.debug

Debugging tools.

DataDumper

Dumps data to a destination (e.g., file) with support for serialization.

Source code in penvm/src/lib/penvm/lib/debug.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class DataDumper:
    """Dumps data to a destination (e.g., file) with support for
    serialization."""

    def __init__(self, path: str):
        """Initialize.

        Args:
            path: File path to dump data.
        """
        self.path = path
        self.lock = Lock()
        self.f = open(path, "ab")

    def __del__(self):
        try:
            self.f.close()
        except:
            pass

    def writebytes(
        self,
        b: bytes,
        flush: bool = True,
    ):
        """Write bytes.

        Args:
            b: Bytes to write
            flush: Flush stream.
        """
        try:
            self.lock.acquire()
            self.f.write(b)
            if flush:
                self.f.flush()
        except Exception as e:
            print(f"EXCEPTION {e}")
        finally:
            self.lock.release()

    def writetext(self, t: str, flush: bool = False):
        """Write text.

        Args:
            t: String to write.
            flush: Flush to stream.
        """
        self.writebytes(t.decode("utf-8"), flush)

__init__(path)

Initialize.

Parameters:

Name Type Description Default
path str

File path to dump data.

required
Source code in penvm/src/lib/penvm/lib/debug.py
30
31
32
33
34
35
36
37
38
def __init__(self, path: str):
    """Initialize.

    Args:
        path: File path to dump data.
    """
    self.path = path
    self.lock = Lock()
    self.f = open(path, "ab")

writebytes(b, flush=True)

Write bytes.

Parameters:

Name Type Description Default
b bytes

Bytes to write

required
flush bool

Flush stream.

True
Source code in penvm/src/lib/penvm/lib/debug.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def writebytes(
    self,
    b: bytes,
    flush: bool = True,
):
    """Write bytes.

    Args:
        b: Bytes to write
        flush: Flush stream.
    """
    try:
        self.lock.acquire()
        self.f.write(b)
        if flush:
            self.f.flush()
    except Exception as e:
        print(f"EXCEPTION {e}")
    finally:
        self.lock.release()

writetext(t, flush=False)

Write text.

Parameters:

Name Type Description Default
t str

String to write.

required
flush bool

Flush to stream.

False
Source code in penvm/src/lib/penvm/lib/debug.py
67
68
69
70
71
72
73
74
def writetext(self, t: str, flush: bool = False):
    """Write text.

    Args:
        t: String to write.
        flush: Flush to stream.
    """
    self.writebytes(t.decode("utf-8"), flush)

penvm.lib.kvstore

FInfo

File info for penvm.lib.kvstore.FileKVStore.

Tracks value type (text, bytes).

Source code in penvm/src/lib/penvm/lib/kvstore.py
177
178
179
180
181
182
183
184
185
class FInfo:
    """File info for [penvm.lib.kvstore.FileKVStore][].

    Tracks value type (text, bytes).
    """

    def __init__(self):
        """Initialize."""
        self.type = None

__init__()

Initialize.

Source code in penvm/src/lib/penvm/lib/kvstore.py
183
184
185
def __init__(self):
    """Initialize."""
    self.type = None

FileKVStore

Bases: KVStore

File based penvm.lib.kvstore.KVStore.

Source code in penvm/src/lib/penvm/lib/kvstore.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
class FileKVStore(KVStore):
    """File based [penvm.lib.kvstore.KVStore][]."""

    def __init__(
        self,
        dirpath: str,
    ):
        """Initialize.

        Args:
            dirpath: Directory path to store files.

        See See [penvm.lib.kvstore.KVStore.__init__][].
        """
        try:
            super().__init__()
            self.oid = "file"

            self.dirpath = dirpath

            if not self.dirpath.startswith("/tmp/"):
                raise Exception("FileKVStore must be under allowed directory")
            self.d = {}
            os.mkdir(self.dirpath, 0o700)
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            pass

    def __del__(self):
        if not self.dirpath.startswith("/tmp/"):
            return

        for name in self.d.keys():
            self.drop(name)

        try:
            os.rmdir(self.dirpath)
        except Exception as e:
            pass

    def get(
        self,
        k: str,
        default: Union[str, bytes, None] = None,
    ) -> Union[str, bytes, None]:
        """See [penvm.lib.kvstore.KVStore.get][]."""
        try:
            k = k.replace("/", "__")
            finfo = self.d.get(k)
            if not finfo:
                return None

            if finfo.type == "text":
                mode = "r+t"
            elif finfo.type == "binary":
                mode = "r+b"
            path = f"{self.dirpath}/{k}"
            if os.path.exists(path):
                v = open(path, mode).read()
        except Exception as e:
            self.logger.warning(f"put EXCEPTION ({e})")

        return v

    def get_path(
        self,
        k: str,
        default: Union[str, None] = None,
    ) -> str:
        """Return the path associated with the key.

        Args:
            k: Key.
            default: Default value.

        Returns:
            File path holding value.
        """
        try:
            path = f"{self.dirpath}/{k}"
            if not os.path.exists(path):
                return None
            return path
        except Exception as e:
            pass

    def keys(
        self,
        pattern=None,
    ) -> List[str]:
        """See [penvm.lib.kvstore.KVStore.keys][]."""
        try:
            names = list(self.d.keys())
            # names = os.listdir(f"{self.dirpath}")
        except Exception as e:
            names = []

        if pattern:
            names = fnmatch.filter(names, pattern)
        return names

    def pop(
        self,
        k: str,
    ) -> Union[str, bytes, None]:
        """See [penvm.lib.kvstore.KVStore.pop][]."""
        try:
            v = self.get(k)
            k = k.replace("/", "__")
            path = f"{self.dirpath}/{k}"
            os.remove(path)
            self.d.pop(k)
            return v
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

    def put(
        self,
        k: str,
        v: Union[str, bytes],
    ):
        """See [penvm.lib.kvstore.KVStore.put][]."""
        try:
            k = k.replace("/", "__")
            path = f"{self.dirpath}/{k}"
            self.logger.debug(f"put {k=} {path=}")

            finfo = FInfo()
            if type(v) == str:
                finfo.type = "text"
                mode = "w+t"
            else:
                finfo.type = "binary"
                mode = "w+b"
            open(path, mode).write(v)
            self.d[k] = finfo
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

__init__(dirpath)

Initialize.

Parameters:

Name Type Description Default
dirpath str

Directory path to store files.

required

See See penvm.lib.kvstore.KVStore.init.

Source code in penvm/src/lib/penvm/lib/kvstore.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def __init__(
    self,
    dirpath: str,
):
    """Initialize.

    Args:
        dirpath: Directory path to store files.

    See See [penvm.lib.kvstore.KVStore.__init__][].
    """
    try:
        super().__init__()
        self.oid = "file"

        self.dirpath = dirpath

        if not self.dirpath.startswith("/tmp/"):
            raise Exception("FileKVStore must be under allowed directory")
        self.d = {}
        os.mkdir(self.dirpath, 0o700)
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        pass

get(k, default=None)

See penvm.lib.kvstore.KVStore.get.

Source code in penvm/src/lib/penvm/lib/kvstore.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def get(
    self,
    k: str,
    default: Union[str, bytes, None] = None,
) -> Union[str, bytes, None]:
    """See [penvm.lib.kvstore.KVStore.get][]."""
    try:
        k = k.replace("/", "__")
        finfo = self.d.get(k)
        if not finfo:
            return None

        if finfo.type == "text":
            mode = "r+t"
        elif finfo.type == "binary":
            mode = "r+b"
        path = f"{self.dirpath}/{k}"
        if os.path.exists(path):
            v = open(path, mode).read()
    except Exception as e:
        self.logger.warning(f"put EXCEPTION ({e})")

    return v

get_path(k, default=None)

Return the path associated with the key.

Parameters:

Name Type Description Default
k str

Key.

required
default Union[str, None]

Default value.

None

Returns:

Type Description
str

File path holding value.

Source code in penvm/src/lib/penvm/lib/kvstore.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def get_path(
    self,
    k: str,
    default: Union[str, None] = None,
) -> str:
    """Return the path associated with the key.

    Args:
        k: Key.
        default: Default value.

    Returns:
        File path holding value.
    """
    try:
        path = f"{self.dirpath}/{k}"
        if not os.path.exists(path):
            return None
        return path
    except Exception as e:
        pass

keys(pattern=None)

See penvm.lib.kvstore.KVStore.keys.

Source code in penvm/src/lib/penvm/lib/kvstore.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def keys(
    self,
    pattern=None,
) -> List[str]:
    """See [penvm.lib.kvstore.KVStore.keys][]."""
    try:
        names = list(self.d.keys())
        # names = os.listdir(f"{self.dirpath}")
    except Exception as e:
        names = []

    if pattern:
        names = fnmatch.filter(names, pattern)
    return names

pop(k)

See penvm.lib.kvstore.KVStore.pop.

Source code in penvm/src/lib/penvm/lib/kvstore.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def pop(
    self,
    k: str,
) -> Union[str, bytes, None]:
    """See [penvm.lib.kvstore.KVStore.pop][]."""
    try:
        v = self.get(k)
        k = k.replace("/", "__")
        path = f"{self.dirpath}/{k}"
        os.remove(path)
        self.d.pop(k)
        return v
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

put(k, v)

See penvm.lib.kvstore.KVStore.put.

Source code in penvm/src/lib/penvm/lib/kvstore.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def put(
    self,
    k: str,
    v: Union[str, bytes],
):
    """See [penvm.lib.kvstore.KVStore.put][]."""
    try:
        k = k.replace("/", "__")
        path = f"{self.dirpath}/{k}"
        self.logger.debug(f"put {k=} {path=}")

        finfo = FInfo()
        if type(v) == str:
            finfo.type = "text"
            mode = "w+t"
        else:
            finfo.type = "binary"
            mode = "w+b"
        open(path, mode).write(v)
        self.d[k] = finfo
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

KVStore

Bases: BaseObject

Base key+value store.

Source code in penvm/src/lib/penvm/lib/kvstore.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class KVStore(BaseObject):
    """Base key+value store."""

    def __init__(self):
        """Initialize."""
        super().__init__(None, logger)

    def drop(
        self,
        k: Any,
    ):
        """Drop value for k.

        Args:
            k: Key.
        """
        self.pop(k)

    def exists(
        self,
        k: Any,
    ) -> bool:
        """Indicate if key exists or not.

        Args:
            k: Key.

        Returns:
            Status of k in store.
        """
        pass

    def get(
        self,
        k: Any,
        default: Any = None,
    ) -> Any:
        """Get value for k.

        Args:
            k: Any.
            default: Any.

        Returns:
            Value for k.
        """
        pass

    def keys(
        self,
        pattern: Union[str, None] = None,
    ) -> List[Any]:
        """List of keys in store.

        Args:
            pattern: Filter for keys to return.

        Returns:
            Keys matching pattern.
        """
        return []

    def pop(
        self,
        k: Any,
        default: Any = None,
    ) -> Any:
        """Pop value from store for k.

        Args:
            k: Key.

        Returns:
            Value for k or `default` otherwise.
        """
        pass

    def put(
        self,
        k: Any,
        v: Any,
    ):
        """Put value in store.

        Args:
            k: Key.
            v: Value.
        """
        pass

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` of store.
        """
        try:
            return State(
                "kvstore",
                self.oid,
                {
                    "names": list(self.keys()),
                },
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

__init__()

Initialize.

Source code in penvm/src/lib/penvm/lib/kvstore.py
35
36
37
def __init__(self):
    """Initialize."""
    super().__init__(None, logger)

drop(k)

Drop value for k.

Parameters:

Name Type Description Default
k Any

Key.

required
Source code in penvm/src/lib/penvm/lib/kvstore.py
39
40
41
42
43
44
45
46
47
48
def drop(
    self,
    k: Any,
):
    """Drop value for k.

    Args:
        k: Key.
    """
    self.pop(k)

exists(k)

Indicate if key exists or not.

Parameters:

Name Type Description Default
k Any

Key.

required

Returns:

Type Description
bool

Status of k in store.

Source code in penvm/src/lib/penvm/lib/kvstore.py
50
51
52
53
54
55
56
57
58
59
60
61
62
def exists(
    self,
    k: Any,
) -> bool:
    """Indicate if key exists or not.

    Args:
        k: Key.

    Returns:
        Status of k in store.
    """
    pass

get(k, default=None)

Get value for k.

Parameters:

Name Type Description Default
k Any

Any.

required
default Any

Any.

None

Returns:

Type Description
Any

Value for k.

Source code in penvm/src/lib/penvm/lib/kvstore.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def get(
    self,
    k: Any,
    default: Any = None,
) -> Any:
    """Get value for k.

    Args:
        k: Any.
        default: Any.

    Returns:
        Value for k.
    """
    pass

keys(pattern=None)

List of keys in store.

Parameters:

Name Type Description Default
pattern Union[str, None]

Filter for keys to return.

None

Returns:

Type Description
List[Any]

Keys matching pattern.

Source code in penvm/src/lib/penvm/lib/kvstore.py
80
81
82
83
84
85
86
87
88
89
90
91
92
def keys(
    self,
    pattern: Union[str, None] = None,
) -> List[Any]:
    """List of keys in store.

    Args:
        pattern: Filter for keys to return.

    Returns:
        Keys matching pattern.
    """
    return []

pop(k, default=None)

Pop value from store for k.

Parameters:

Name Type Description Default
k Any

Key.

required

Returns:

Type Description
Any

Value for k or default otherwise.

Source code in penvm/src/lib/penvm/lib/kvstore.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def pop(
    self,
    k: Any,
    default: Any = None,
) -> Any:
    """Pop value from store for k.

    Args:
        k: Key.

    Returns:
        Value for k or `default` otherwise.
    """
    pass

put(k, v)

Put value in store.

Parameters:

Name Type Description Default
k Any

Key.

required
v Any

Value.

required
Source code in penvm/src/lib/penvm/lib/kvstore.py
109
110
111
112
113
114
115
116
117
118
119
120
def put(
    self,
    k: Any,
    v: Any,
):
    """Put value in store.

    Args:
        k: Key.
        v: Value.
    """
    pass

state()

Get object state.

Returns:

Type Description
State

State of store.

Source code in penvm/src/lib/penvm/lib/kvstore.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` of store.
    """
    try:
        return State(
            "kvstore",
            self.oid,
            {
                "names": list(self.keys()),
            },
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

MemoryKVStore

Bases: KVStore

Memory based penvm.lib.kvstore.KVStore.

Source code in penvm/src/lib/penvm/lib/kvstore.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class MemoryKVStore(KVStore):
    """Memory based [penvm.lib.kvstore.KVStore][]."""

    def __init__(self):
        """Initialize."""
        super().__init__()
        self.oid = "memory"
        self.d = {}

    def exists(self, k: str) -> bool:
        """See [penvm.lib.kvstore.KVStore.exists][]."""
        return k in self.d

    def get(
        self,
        k: str,
        default: Any = None,
    ) -> Any:
        """See [penvm.lib.kvstore.KVStore.get][]."""
        return self.d.get(k, default)

    def keys(
        self,
        pattern: Union[str, None] = None,
    ) -> List[str]:
        """See See [penvm.lib.kvstore.KVStore.keys][]."""
        return fnmatch.filter(self.d.keys(), pattern)

    def pop(self, k: str) -> Any:
        """See [penvm.lib.kvstore.KVStore.pop][]."""
        self.d.pop(k)

    def put(self, k: str, v: Any):
        """See [penvm.lib.kvstore.KVStore.put][]."""
        self.d[k] = v

__init__()

Initialize.

Source code in penvm/src/lib/penvm/lib/kvstore.py
143
144
145
146
147
def __init__(self):
    """Initialize."""
    super().__init__()
    self.oid = "memory"
    self.d = {}

exists(k)

See penvm.lib.kvstore.KVStore.exists.

Source code in penvm/src/lib/penvm/lib/kvstore.py
149
150
151
def exists(self, k: str) -> bool:
    """See [penvm.lib.kvstore.KVStore.exists][]."""
    return k in self.d

get(k, default=None)

See penvm.lib.kvstore.KVStore.get.

Source code in penvm/src/lib/penvm/lib/kvstore.py
153
154
155
156
157
158
159
def get(
    self,
    k: str,
    default: Any = None,
) -> Any:
    """See [penvm.lib.kvstore.KVStore.get][]."""
    return self.d.get(k, default)

keys(pattern=None)

See See penvm.lib.kvstore.KVStore.keys.

Source code in penvm/src/lib/penvm/lib/kvstore.py
161
162
163
164
165
166
def keys(
    self,
    pattern: Union[str, None] = None,
) -> List[str]:
    """See See [penvm.lib.kvstore.KVStore.keys][]."""
    return fnmatch.filter(self.d.keys(), pattern)

pop(k)

See penvm.lib.kvstore.KVStore.pop.

Source code in penvm/src/lib/penvm/lib/kvstore.py
168
169
170
def pop(self, k: str) -> Any:
    """See [penvm.lib.kvstore.KVStore.pop][]."""
    self.d.pop(k)

put(k, v)

See penvm.lib.kvstore.KVStore.put.

Source code in penvm/src/lib/penvm/lib/kvstore.py
172
173
174
def put(self, k: str, v: Any):
    """See [penvm.lib.kvstore.KVStore.put][]."""
    self.d[k] = v

penvm.lib.message

Header

Bases: MessagePart

Header.

Source code in penvm/src/lib/penvm/lib/message.py
267
268
269
270
271
272
273
274
275
276
277
278
class Header(MessagePart):
    """Header."""

    def __init__(self, d=None):
        """Initialize.

        See [penvm.lib.message.MessagePart][].
        """
        super().__init__(d)
        # for new only
        if "id" not in self:
            self["id"] = get_uuid()

__init__(d=None)

Initialize.

See penvm.lib.message.MessagePart.

Source code in penvm/src/lib/penvm/lib/message.py
270
271
272
273
274
275
276
277
278
def __init__(self, d=None):
    """Initialize.

    See [penvm.lib.message.MessagePart][].
    """
    super().__init__(d)
    # for new only
    if "id" not in self:
        self["id"] = get_uuid()

JSONEncoder

Bases: json.JSONEncoder

Enhanced JSONEncoder with support for non-conforming types:

  • bytes - Bytes.
  • complex - Complex.
  • numpy.* - Python-only.
Source code in penvm/src/lib/penvm/lib/message.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class JSONEncoder(json.JSONEncoder):
    """Enhanced JSONEncoder with support for non-conforming types:

    * `bytes` - Bytes.
    * `complex` - Complex.
    * `numpy.*` - Python-only.
    """

    def default(self, o):
        # TODO: optimize/clean up
        otype = type(o)
        if otype == bytes:
            o = {
                NON_CONFORMING: {
                    "type": "bytes",
                    "value": base64.b64encode(o).decode("utf-8"),
                }
            }
            return o
        elif otype == complex:
            o = {
                NON_CONFORMING: {
                    "type": "complex",
                    "value": [o.real, o.imag],
                }
            }
            return o
        elif otype.__module__ == "numpy":
            try:
                import numpy
            except:
                numpy = None

            f = io.BytesIO()
            numpy.save(f, o)
            o = {
                NON_CONFORMING: {
                    "type": "numpy",
                    "class": o.__class__.__name__,
                    "value": base64.b64encode(f.getvalue()).decode("utf-8"),
                }
            }
            return o

        return json.JSONEncoder.default(self, o)

Message

Object consisting of header and payload objects.

The header and payload objects are MessagePart objects.

All parts have a -id setting unique to the object.

Message payloads take special fields with - prefix:

  • -type - Message type (e.g., request, response)
  • -status - Status (e.g., ok, error)
  • -message - Status message, usually for "error" status.
Source code in penvm/src/lib/penvm/lib/message.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
class Message:
    """Object consisting of header and payload objects.

    The header and payload objects are `MessagePart` objects.

    All parts have a `-id` setting unique to the object.

    Message payloads take special fields with `-` prefix:

    * `-type` - Message type (e.g., request, response)
    * `-status` - Status (e.g., ok, error)
    * `-message` - Status message, usually for "error" status.
    """

    def __init__(
        self,
        header: Union[Header, None] = None,
        payload: Union[Payload, None] = None,
    ):
        """Initialize.

        Args:
            header: Header object.
            payload: Payload object.
        """
        self.header = header or Header()
        self.payload = payload or Payload()

    def __repr__(self):
        return f"<{self.__class__.__name__} header ({self.header}) payload ({self.payload})>"

    @staticmethod
    def decode(b):
        """Decode bytes according to Message format.

        Bytes as:

        * header[HEADER_SZ_LEN] -> sz (encoded as plain text number)
        * header[HEADER_SZ_LEN:HEADER_SZ_LEN+sz] -> bytes
        * payload[PAYLOAD_SZ_LEN] -> sz (encoded as plain text number)
        * payload[PAYLOAD_SZ_LEN:PAYLOAD_SZ_LEN+sz] -> bytes

        See [penvm.lib.message.MessagePart][].
        """
        hsz = int(b[:HEADER_SZ_LEN])
        t0 = time.time()
        h = Header(Header.decode(b[HEADER_SZ_LEN : HEADER_SZ_LEN + hsz]))
        t1 = time.time()
        h["-decode-elapsed"] = t1 - t0

        t0 = time.time()
        pb = b[HEADER_SZ_LEN + hsz :]
        psz = int(pb[:PAYLOAD_SZ_LEN])
        p = Payload(Payload.decode(pb[PAYLOAD_SZ_LEN : PAYLOAD_SZ_LEN + psz]))
        t1 = time.time()
        p["-decode-elapsed"] = t1 - t0

        return h, p

    def encode(self):
        """Encode."""
        h = self.header.encode()
        p = self.payload.encode()
        return (HEADER_SZ_FMT % len(h)) + h + (PAYLOAD_SZ_FMT % len(p)) + p

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` object.
        """
        try:
            return State(
                "message",
                self.header.get("id"),
                {
                    "header": self.header.dict(),
                    "payload": self.payload.dict(),
                },
            )
        except Exception as e:
            logger.warning(f"EXCEPTION ({e})")

__init__(header=None, payload=None)

Initialize.

Parameters:

Name Type Description Default
header Union[Header, None]

Header object.

None
payload Union[Payload, None]

Payload object.

None
Source code in penvm/src/lib/penvm/lib/message.py
309
310
311
312
313
314
315
316
317
318
319
320
321
def __init__(
    self,
    header: Union[Header, None] = None,
    payload: Union[Payload, None] = None,
):
    """Initialize.

    Args:
        header: Header object.
        payload: Payload object.
    """
    self.header = header or Header()
    self.payload = payload or Payload()

decode(b) staticmethod

Decode bytes according to Message format.

Bytes as:

  • header[HEADER_SZ_LEN] -> sz (encoded as plain text number)
  • header[HEADER_SZ_LEN:HEADER_SZ_LEN+sz] -> bytes
  • payload[PAYLOAD_SZ_LEN] -> sz (encoded as plain text number)
  • payload[PAYLOAD_SZ_LEN:PAYLOAD_SZ_LEN+sz] -> bytes

See penvm.lib.message.MessagePart.

Source code in penvm/src/lib/penvm/lib/message.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
@staticmethod
def decode(b):
    """Decode bytes according to Message format.

    Bytes as:

    * header[HEADER_SZ_LEN] -> sz (encoded as plain text number)
    * header[HEADER_SZ_LEN:HEADER_SZ_LEN+sz] -> bytes
    * payload[PAYLOAD_SZ_LEN] -> sz (encoded as plain text number)
    * payload[PAYLOAD_SZ_LEN:PAYLOAD_SZ_LEN+sz] -> bytes

    See [penvm.lib.message.MessagePart][].
    """
    hsz = int(b[:HEADER_SZ_LEN])
    t0 = time.time()
    h = Header(Header.decode(b[HEADER_SZ_LEN : HEADER_SZ_LEN + hsz]))
    t1 = time.time()
    h["-decode-elapsed"] = t1 - t0

    t0 = time.time()
    pb = b[HEADER_SZ_LEN + hsz :]
    psz = int(pb[:PAYLOAD_SZ_LEN])
    p = Payload(Payload.decode(pb[PAYLOAD_SZ_LEN : PAYLOAD_SZ_LEN + psz]))
    t1 = time.time()
    p["-decode-elapsed"] = t1 - t0

    return h, p

encode()

Encode.

Source code in penvm/src/lib/penvm/lib/message.py
354
355
356
357
358
def encode(self):
    """Encode."""
    h = self.header.encode()
    p = self.payload.encode()
    return (HEADER_SZ_FMT % len(h)) + h + (PAYLOAD_SZ_FMT % len(p)) + p

state()

Get object state.

Returns:

Type Description
State

State object.

Source code in penvm/src/lib/penvm/lib/message.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` object.
    """
    try:
        return State(
            "message",
            self.header.get("id"),
            {
                "header": self.header.dict(),
                "payload": self.payload.dict(),
            },
        )
    except Exception as e:
        logger.warning(f"EXCEPTION ({e})")

MessagePart

Bases: MutableMapping

Dict-type object with enhanced json codec support for bytes.

Source code in penvm/src/lib/penvm/lib/message.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
class MessagePart(MutableMapping):
    """Dict-type object with enhanced json codec support for bytes."""

    def __init__(
        self,
        d: dict = None,
    ):
        """Initialize.

        Args:
            d: Initial configuration.
        """
        self.d = {}
        if d != None:
            self.d.update(d)

    def __delitem__(
        self,
        k: Any,
    ):
        del self.d[k]

    def __getitem__(self, k):
        return self.d[k]

    def __iter__(self):
        return iter(self.d.keys())

    def __len__(self):
        return len(self.d)

    def __repr__(self):
        # this can be expensive for big payloads
        s = str(self.d)
        if len(s) > 256:
            s = s[:253] + "..."
        return f"<{self.__class__.__name__} ({s})>"

    def __setitem__(self, k, v):
        self.d[k] = v

    @staticmethod
    def decode(b: bytes) -> str:
        """Decode bytes with support for local decodings.

        Args:
            b: Bytes to decode.

        Returns:
            JSON string.
        """
        return json.loads(b.decode("utf-8"), object_hook=json_decoder_object_hook)

    def dict(
        self,
        clean: bool = False,
    ) -> Dict:
        """Return a copy as a dict. Optionally clean of "private"
        top-level items (keys start with "-").

        Args:
            clean: Remove keys starting with "-".

        Returns:
            Dictionary.
        """
        d = copy.deepcopy(self.d)
        if clean:
            for k in list(d.keys()):
                if k.startswith("-"):
                    del d[k]
        return d

    def dumps(
        self,
        indent: Union[int, None] = None,
        sort_keys: Union[bool, None] = None,
    ) -> str:
        """Dump contents as a stringified dict.

        Args:
            indent: Indent size.
            sort_keys: Sort keys.

        Returns:
            Stringified dictionary.
        """
        # TODO: not reall a stringified dict. drop in favor of json()?
        kwargs = {}
        if indent != None:
            kwargs["indent"] = indent
        if sort_keys != None:
            kwargs["sort_keys"] = sort_keys
        return json.dumps(self.d, **kwargs)

    def encode(self) -> str:
        """Encode items and return JSON string.

        Returns:
            JSON string.
        """
        return json.dumps(self.d, cls=JSONEncoder).encode("utf-8")

    def json(
        self,
        indent: Union[int, None] = 2,
        clean: bool = False,
    ) -> str:
        """Return JSON string.

        Args:
            indent: Indent size.
            clean: Remove for keys starting with "-".

        Returns:
            JSON string.
        """
        d = self.dict(clean)
        return json.dumps(d, indent=indent)

    def yaml(
        self,
        clean: bool = False,
    ) -> str:
        """Return YAML.

        Args:
            clean: Remove for keys starting with "-".

        Returns:
            YAML.
        """
        d = self.dict(clean)
        return yaml.dump(d)

__init__(d=None)

Initialize.

Parameters:

Name Type Description Default
d dict

Initial configuration.

None
Source code in penvm/src/lib/penvm/lib/message.py
134
135
136
137
138
139
140
141
142
143
144
145
def __init__(
    self,
    d: dict = None,
):
    """Initialize.

    Args:
        d: Initial configuration.
    """
    self.d = {}
    if d != None:
        self.d.update(d)

decode(b) staticmethod

Decode bytes with support for local decodings.

Parameters:

Name Type Description Default
b bytes

Bytes to decode.

required

Returns:

Type Description
str

JSON string.

Source code in penvm/src/lib/penvm/lib/message.py
172
173
174
175
176
177
178
179
180
181
182
@staticmethod
def decode(b: bytes) -> str:
    """Decode bytes with support for local decodings.

    Args:
        b: Bytes to decode.

    Returns:
        JSON string.
    """
    return json.loads(b.decode("utf-8"), object_hook=json_decoder_object_hook)

dict(clean=False)

Return a copy as a dict. Optionally clean of "private" top-level items (keys start with "-").

Parameters:

Name Type Description Default
clean bool

Remove keys starting with "-".

False

Returns:

Type Description
Dict

Dictionary.

Source code in penvm/src/lib/penvm/lib/message.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def dict(
    self,
    clean: bool = False,
) -> Dict:
    """Return a copy as a dict. Optionally clean of "private"
    top-level items (keys start with "-").

    Args:
        clean: Remove keys starting with "-".

    Returns:
        Dictionary.
    """
    d = copy.deepcopy(self.d)
    if clean:
        for k in list(d.keys()):
            if k.startswith("-"):
                del d[k]
    return d

dumps(indent=None, sort_keys=None)

Dump contents as a stringified dict.

Parameters:

Name Type Description Default
indent Union[int, None]

Indent size.

None
sort_keys Union[bool, None]

Sort keys.

None

Returns:

Type Description
str

Stringified dictionary.

Source code in penvm/src/lib/penvm/lib/message.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def dumps(
    self,
    indent: Union[int, None] = None,
    sort_keys: Union[bool, None] = None,
) -> str:
    """Dump contents as a stringified dict.

    Args:
        indent: Indent size.
        sort_keys: Sort keys.

    Returns:
        Stringified dictionary.
    """
    # TODO: not reall a stringified dict. drop in favor of json()?
    kwargs = {}
    if indent != None:
        kwargs["indent"] = indent
    if sort_keys != None:
        kwargs["sort_keys"] = sort_keys
    return json.dumps(self.d, **kwargs)

encode()

Encode items and return JSON string.

Returns:

Type Description
str

JSON string.

Source code in penvm/src/lib/penvm/lib/message.py
226
227
228
229
230
231
232
def encode(self) -> str:
    """Encode items and return JSON string.

    Returns:
        JSON string.
    """
    return json.dumps(self.d, cls=JSONEncoder).encode("utf-8")

json(indent=2, clean=False)

Return JSON string.

Parameters:

Name Type Description Default
indent Union[int, None]

Indent size.

2
clean bool

Remove for keys starting with "-".

False

Returns:

Type Description
str

JSON string.

Source code in penvm/src/lib/penvm/lib/message.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def json(
    self,
    indent: Union[int, None] = 2,
    clean: bool = False,
) -> str:
    """Return JSON string.

    Args:
        indent: Indent size.
        clean: Remove for keys starting with "-".

    Returns:
        JSON string.
    """
    d = self.dict(clean)
    return json.dumps(d, indent=indent)

yaml(clean=False)

Return YAML.

Parameters:

Name Type Description Default
clean bool

Remove for keys starting with "-".

False

Returns:

Type Description
str

YAML.

Source code in penvm/src/lib/penvm/lib/message.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def yaml(
    self,
    clean: bool = False,
) -> str:
    """Return YAML.

    Args:
        clean: Remove for keys starting with "-".

    Returns:
        YAML.
    """
    d = self.dict(clean)
    return yaml.dump(d)

Payload

Bases: MessagePart

Payload.

Source code in penvm/src/lib/penvm/lib/message.py
281
282
283
284
285
286
287
288
289
290
291
292
class Payload(MessagePart):
    """Payload."""

    def __init__(self, d=None):
        """Initialize.

        See [penvm.lib.message.MessagePart][].
        """
        super().__init__(d)
        # for new only
        if "-id" not in self:
            self["-id"] = get_uuid()

__init__(d=None)

Initialize.

See penvm.lib.message.MessagePart.

Source code in penvm/src/lib/penvm/lib/message.py
284
285
286
287
288
289
290
291
292
def __init__(self, d=None):
    """Initialize.

    See [penvm.lib.message.MessagePart][].
    """
    super().__init__(d)
    # for new only
    if "-id" not in self:
        self["-id"] = get_uuid()

ErrorResponse(message, *args, **kwargs)

Return an error response message.

See penvm.lib.message.Message.

Source code in penvm/src/lib/penvm/lib/message.py
407
408
409
410
411
412
413
414
415
416
417
418
419
def ErrorResponse(message, *args, **kwargs):
    """Return an error response message.

    See [penvm.lib.message.Message][].
    """
    msg = Response(*args, **kwargs)
    msg.payload.update(
        {
            "-status": "error",
            "-message": message,
        }
    )
    return msg

OkResponse(*args, **kwargs)

Return an ok response message.

See penvm.lib.message.Message.

Source code in penvm/src/lib/penvm/lib/message.py
422
423
424
425
426
427
428
429
def OkResponse(*args, **kwargs):
    """Return an ok response message.

    See [penvm.lib.message.Message][].
    """
    msg = Response(*args, **kwargs)
    msg.payload["-status"] = "ok"
    return msg

Request(*args, **kwargs)

Return a request message.

See penvm.lib.message.Message.

Source code in penvm/src/lib/penvm/lib/message.py
384
385
386
387
388
389
390
391
def Request(*args, **kwargs):
    """Return a request message.

    See [penvm.lib.message.Message][].
    """
    msg = Message(*args, **kwargs)
    msg.payload["-type"] = "request"
    return msg

Response(*args, **kwargs)

Return a response message.

See penvm.lib.message.Message.

Source code in penvm/src/lib/penvm/lib/message.py
394
395
396
397
398
399
400
401
402
403
404
def Response(*args, **kwargs):
    """Return a response message.

    See [penvm.lib.message.Message][].
    """
    refmsg = kwargs.pop("refmsg", None)
    msg = Message(*args, **kwargs)
    if refmsg:
        msg.header["ref-id"] = refmsg.header.get("id")
    msg.payload["-type"] = "response"
    return msg

json_decoder_object_hook(o)

Support for decoding of non-conforming types:

  • bytes - Bytes.
  • complex - Complex.
  • numpy.* - Numpy serialized objects (Python-only).

The encoding is an object as:

NON_CONFORMING: {
    "type": <str>,
    "value": <base64encoding>,
}

Note: Object keys cannot be bytes.

Source code in penvm/src/lib/penvm/lib/message.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def json_decoder_object_hook(o):
    """Support for decoding of non-conforming types:

    * `bytes` - Bytes.
    * `complex` - Complex.
    * `numpy.*` - Numpy serialized objects (Python-only).

    The encoding is an object as:

    ```
    NON_CONFORMING: {
        "type": <str>,
        "value": <base64encoding>,
    }
    ```

    Note: Object keys cannot be bytes.
    """

    if NON_CONFORMING in o:
        nctype = o.get(NON_CONFORMING).get("type")
        ncvalue = o.get(NON_CONFORMING).get("value")
        if nctype:
            if nctype == "bytes":
                return base64.b64decode(ncvalue.encode("utf-8"))
            elif nctype == "complex":
                return complex(ncvalue[0], ncvalue[1])
            elif nctype == "numpy":
                try:
                    import numpy
                except:
                    numpy = None

                f = io.BytesIO(base64.b64decode(ncvalue.encode("utf-8")))
                return numpy.load(f, allow_pickle=False)

    return o

penvm.lib.misc

Collection of miscellaneous code.

LogMark

Provides unique id and time information: t0 (start time) and tlast (lap time).

Source code in penvm/src/lib/penvm/lib/misc.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class LogMark:
    """Provides unique id and time information: t0 (start time) and
    tlast (lap time)."""

    def __init__(self):
        """Initialize."""
        self.t0 = time.time()
        self.tlast = self.t0
        self.uuid = get_log_uuid()

    def elapsed(self) -> float:
        """Return elapsed time since object initialize.

        Returns:
            Elapsed time in seconds.
        """
        t1 = time.time()
        return t1, t1 - self.t0

    def lap(self) -> Tuple[float, float, float]:
        """Return triple (now, elasped since init, elapsed since last lap).

        Returns:
            Tuple of (not, elapsed since init, elapsed since lap) in
            seconds.
        """
        tnow = time.time()
        tlast = self.tlast
        self.tlast = tnow
        return tnow, tnow - self.t0, tnow - tlast

    def reset(self):
        """Reset the "init" time."""
        self.t0 = time.time()

__init__()

Initialize.

Source code in penvm/src/lib/penvm/lib/misc.py
42
43
44
45
46
def __init__(self):
    """Initialize."""
    self.t0 = time.time()
    self.tlast = self.t0
    self.uuid = get_log_uuid()

elapsed()

Return elapsed time since object initialize.

Returns:

Type Description
float

Elapsed time in seconds.

Source code in penvm/src/lib/penvm/lib/misc.py
48
49
50
51
52
53
54
55
def elapsed(self) -> float:
    """Return elapsed time since object initialize.

    Returns:
        Elapsed time in seconds.
    """
    t1 = time.time()
    return t1, t1 - self.t0

lap()

Return triple (now, elasped since init, elapsed since last lap).

Returns:

Type Description
float

Tuple of (not, elapsed since init, elapsed since lap) in

float

seconds.

Source code in penvm/src/lib/penvm/lib/misc.py
57
58
59
60
61
62
63
64
65
66
67
def lap(self) -> Tuple[float, float, float]:
    """Return triple (now, elasped since init, elapsed since last lap).

    Returns:
        Tuple of (not, elapsed since init, elapsed since lap) in
        seconds.
    """
    tnow = time.time()
    tlast = self.tlast
    self.tlast = tnow
    return tnow, tnow - self.t0, tnow - tlast

reset()

Reset the "init" time.

Source code in penvm/src/lib/penvm/lib/misc.py
69
70
71
def reset(self):
    """Reset the "init" time."""
    self.t0 = time.time()

LoggerAdapter

Bases: _LoggerAdapter

Log adapter which provides "owner" information as a prefix in the log entry.

Source code in penvm/src/lib/penvm/lib/misc.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class LoggerAdapter(_LoggerAdapter):
    """Log adapter which provides "owner" information as a prefix in
    the log entry.
    """

    def __init__(
        self,
        *args,
        **kwargs,
    ):
        """Initialize."""
        super().__init__(*args, **kwargs)

        o = self.extra.get("self")
        if o != None:
            classname = o.__class__.__name__
            idval = self.extra.get("id", "-")
            self.extra["prefix"] = f"""{classname}[{idval}]"""
        else:
            self.extra["prefix"] = self.extra.get("prefix", "")

    def process(
        self,
        msg: str,
        kwargs: dict,
    ) -> str:
        """Return processed log message.

        Args:
            msg: Message format string.
            kwargs: Dictionary containers items for populating `msg.

        Return:
            Processed message format string.
        """
        return "%s: %s" % (self.extra.get("prefix"), msg), kwargs

    def enter(
        self,
        *args: List,
        **kwargs: Dict,
    ) -> "TaggedLoggerAdapter":
        """Set up a TaggedLoggerAdapter, call enter(), return new
        adapter.

        Args:
            args: Log message args.
            kwargs: Log message kwargs.

        Returns:
            TaggedLoggerAdapter.
        """
        tlogger = TaggedLoggerAdapter(self, stacklevel=4)
        tlogger.enter(*args, stacklevel=5, **kwargs)
        return tlogger

__init__(*args, **kwargs)

Initialize.

Source code in penvm/src/lib/penvm/lib/misc.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(
    self,
    *args,
    **kwargs,
):
    """Initialize."""
    super().__init__(*args, **kwargs)

    o = self.extra.get("self")
    if o != None:
        classname = o.__class__.__name__
        idval = self.extra.get("id", "-")
        self.extra["prefix"] = f"""{classname}[{idval}]"""
    else:
        self.extra["prefix"] = self.extra.get("prefix", "")

enter(*args, **kwargs)

Set up a TaggedLoggerAdapter, call enter(), return new adapter.

Parameters:

Name Type Description Default
args List

Log message args.

()
kwargs Dict

Log message kwargs.

{}

Returns:

Type Description
TaggedLoggerAdapter

TaggedLoggerAdapter.

Source code in penvm/src/lib/penvm/lib/misc.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def enter(
    self,
    *args: List,
    **kwargs: Dict,
) -> "TaggedLoggerAdapter":
    """Set up a TaggedLoggerAdapter, call enter(), return new
    adapter.

    Args:
        args: Log message args.
        kwargs: Log message kwargs.

    Returns:
        TaggedLoggerAdapter.
    """
    tlogger = TaggedLoggerAdapter(self, stacklevel=4)
    tlogger.enter(*args, stacklevel=5, **kwargs)
    return tlogger

process(msg, kwargs)

Return processed log message.

Parameters:

Name Type Description Default
msg str

Message format string.

required
kwargs dict

Dictionary containers items for populating `msg.

required
Return

Processed message format string.

Source code in penvm/src/lib/penvm/lib/misc.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def process(
    self,
    msg: str,
    kwargs: dict,
) -> str:
    """Return processed log message.

    Args:
        msg: Message format string.
        kwargs: Dictionary containers items for populating `msg.

    Return:
        Processed message format string.
    """
    return "%s: %s" % (self.extra.get("prefix"), msg), kwargs

MachineConnectionSpec

Machine connection spec.

Information required to establish a connection to a machine.

Source code in penvm/src/lib/penvm/lib/misc.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class MachineConnectionSpec:
    """Machine connection spec.

    Information required to establish a connection to a machine.
    """

    def __init__(
        self,
        machconnstr: Union[str, None] = None,
        config: Union[dict, None] = None,
        machine: Union["Machine", None] = None,
    ):
        """
        Initialize.

        One of the arguments is used to configure.

        Args:
            machconnstr: Machine connection string: colon-separated
                string consisting of machine id, ssl profile, host,
                port.
            config: Dictionary of machine configuration settings.
            machine: Machine object.
        """
        if machconnstr != None:
            self.machid, self.sslprofile, self.host, self.port = machconnstr.split(":", 3)
            if self.sslprofile == "":
                self.sslprofile = None
            self.port = int(self.port)
        elif config != None:
            self.machid = config.get("machine-id")
            self.sslprofile = config.get("ssl-profile")
            self.host = config.get("host")
            self.port = config.get("port")
        elif machine != None:
            self.machid = machine.oid
            self.sslprofile = machine.sslprofile
            self.host = machine.conn.host
            self.port = machine.conn.port
        else:
            raise Exception("cannot set up MachineConnectionSpec")

    def __str__(self):
        return f"{self.machid}:{self.sslprofile or ''}:{self.host}:{self.port}"

__init__(machconnstr=None, config=None, machine=None)

Initialize.

One of the arguments is used to configure.

Parameters:

Name Type Description Default
machconnstr Union[str, None]

Machine connection string: colon-separated string consisting of machine id, ssl profile, host, port.

None
config Union[dict, None]

Dictionary of machine configuration settings.

None
machine Union[Machine, None]

Machine object.

None
Source code in penvm/src/lib/penvm/lib/misc.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def __init__(
    self,
    machconnstr: Union[str, None] = None,
    config: Union[dict, None] = None,
    machine: Union["Machine", None] = None,
):
    """
    Initialize.

    One of the arguments is used to configure.

    Args:
        machconnstr: Machine connection string: colon-separated
            string consisting of machine id, ssl profile, host,
            port.
        config: Dictionary of machine configuration settings.
        machine: Machine object.
    """
    if machconnstr != None:
        self.machid, self.sslprofile, self.host, self.port = machconnstr.split(":", 3)
        if self.sslprofile == "":
            self.sslprofile = None
        self.port = int(self.port)
    elif config != None:
        self.machid = config.get("machine-id")
        self.sslprofile = config.get("ssl-profile")
        self.host = config.get("host")
        self.port = config.get("port")
    elif machine != None:
        self.machid = machine.oid
        self.sslprofile = machine.sslprofile
        self.host = machine.conn.host
        self.port = machine.conn.port
    else:
        raise Exception("cannot set up MachineConnectionSpec")

State

Bases: dict

Object state.

Holds object identifying information and a dictionary containing relevant object state.

Source code in penvm/src/lib/penvm/lib/misc.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class State(dict):
    """Object state.

    Holds object identifying information and a dictionary containing
    relevant object state.
    """

    def __init__(self, otype: str, oid: str, state: dict):
        """Initialize.

        Args:
            otype: Object type string.
            oid: Object id string.
            state: State information.
        """
        super().__init__()
        self.update(
            {
                "timestamp": get_timestamp(),
                "object": {
                    "type": otype,
                    "id": oid,
                },
                "state": state,
            }
        )

__init__(otype, oid, state)

Initialize.

Parameters:

Name Type Description Default
otype str

Object type string.

required
oid str

Object id string.

required
state dict

State information.

required
Source code in penvm/src/lib/penvm/lib/misc.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def __init__(self, otype: str, oid: str, state: dict):
    """Initialize.

    Args:
        otype: Object type string.
        oid: Object id string.
        state: State information.
    """
    super().__init__()
    self.update(
        {
            "timestamp": get_timestamp(),
            "object": {
                "type": otype,
                "id": oid,
            },
            "state": state,
        }
    )

TaggedLoggerAdapter

Bases: _LoggerAdapter

Logger adapter which tags each log entry with a unique id to allow for tracking. Also provides a means to track timing performance.

Also provides special methods for local (e.g., function/method) logger: enter, exit, elapsed, lap.

Source code in penvm/src/lib/penvm/lib/misc.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
class TaggedLoggerAdapter(_LoggerAdapter):
    """Logger adapter which tags each log entry with a unique id to
    allow for tracking. Also provides a means to track timing
    performance.

    Also provides special methods for local (e.g., function/method)
    logger: enter, exit, elapsed, lap.
    """

    def __init__(self, *args, **kwargs):
        """Initialize."""
        self.stacklevel = kwargs.pop("stacklevel", 4)
        super().__init__(*args, **kwargs)
        self.default_level = logging.DEBUG
        self.mark = LogMark()

    def critical(self, msg, *args, **kwargs):
        """Make a "critical" log entry."""
        _msg = f"[{self.mark.uuid}] {msg}"
        self.log(logging.CRITICAL, _msg, *args, stacklevel=self.stacklevel, **kwargs)

    def debug(self, msg, *args, **kwargs):
        """Make a "debug" log entry."""
        _msg = f"[{self.mark.uuid}] {msg}"
        self.log(logging.DEBUG, _msg, *args, stacklevel=self.stacklevel, **kwargs)

    def elapsed(
        self,
        msg: Union[str, None] = None,
    ):
        """Make a log entry with special format string containing:
        "ELAPSED", a tag, time, and elapsed time (from start).

        Args:
            msg: Additional message to log.
        """
        t1, elapsed = self.mark.elapsed()
        _msg = f"ELAPSED [{self.mark.uuid}, {t1:.5f}, {elapsed:.5f}]"
        if msg:
            _msg = f"{_msg} {msg}"
        self.log(self.default_level, _msg, stacklevel=self.stacklevel)

    def enter(
        self,
        msg: Union[str, None] = None,
        stacklevel: Union[int, None] = None,
    ):
        """Make a log entry with special format string containing:
        "ENTER", tag, and time.

        Args:
            msg: Additional message to log.
            stacklevel: Stack level to extract information from.
        """
        _msg = f"ENTER [{self.mark.uuid}, {self.mark.t0:.5f}]"
        if msg:
            _msg = f"{_msg} {msg}"
        stacklevel = stacklevel if stacklevel != None else self.stacklevel
        self.log(self.default_level, _msg, stacklevel=stacklevel)
        return self

    def error(self, msg, *args, **kwargs):
        """Mark an "error" log entry."""
        _msg = f"[{self.mark.uuid}] {msg}"
        self.log(logging.ERROR, _msg, *args, stacklevel=self.stacklevel, **kwargs)

    def exit(
        self,
        msg: Union[str, None] = None,
    ):
        """Make a log entry with special format string containing:
        "EXIT", tag, time, and elapsed (from start).

        Args:
            msg: Additional message to log.
        """
        t1, elapsed = self.mark.elapsed()
        _msg = f"EXIT [{self.mark.uuid}, {t1:.5f}, {elapsed:.5f}]"
        if msg:
            _msg = f"{_msg} {msg}"
        self.log(self.default_level, _msg, stacklevel=self.stacklevel)

    def state(
        self,
        msg,
        *args: List,
        **kwargs: Dict,
    ):
        _msg = f"[{self.mark.uuid}] {msg}"
        self.log(logging.INFO, _msg, *args, stacklevel=self.stacklevel, **kwargs)

    def lap(
        self,
        msg: Union[str, None] = None,
    ):
        """Make a log entry with special format string containing:
        "LAP", tag, time, and elapsed from split, and from lap split.

        Args:
            msg: Additional message to log.
        """
        t1, elapsed, lap = self.mark.lap()
        _msg = f"LAP [{self.mark.uuid}, {t1:.5f}, {elapsed:.5f}, {lap:.5f}]"
        if msg:
            _msg = f"{_msg} {msg}"
        self.log(self.default_level, _msg, stacklevel=self.stacklevel)

    def warning(self, msg, *args, **kwargs):
        """Make a "warning" log entry."""
        _msg = f"[{self.mark.uuid}] {msg}"
        self.log(logging.WARNING, _msg, *args, stacklevel=self.stacklevel, **kwargs)

__init__(*args, **kwargs)

Initialize.

Source code in penvm/src/lib/penvm/lib/misc.py
214
215
216
217
218
219
def __init__(self, *args, **kwargs):
    """Initialize."""
    self.stacklevel = kwargs.pop("stacklevel", 4)
    super().__init__(*args, **kwargs)
    self.default_level = logging.DEBUG
    self.mark = LogMark()

critical(msg, *args, **kwargs)

Make a "critical" log entry.

Source code in penvm/src/lib/penvm/lib/misc.py
221
222
223
224
def critical(self, msg, *args, **kwargs):
    """Make a "critical" log entry."""
    _msg = f"[{self.mark.uuid}] {msg}"
    self.log(logging.CRITICAL, _msg, *args, stacklevel=self.stacklevel, **kwargs)

debug(msg, *args, **kwargs)

Make a "debug" log entry.

Source code in penvm/src/lib/penvm/lib/misc.py
226
227
228
229
def debug(self, msg, *args, **kwargs):
    """Make a "debug" log entry."""
    _msg = f"[{self.mark.uuid}] {msg}"
    self.log(logging.DEBUG, _msg, *args, stacklevel=self.stacklevel, **kwargs)

elapsed(msg=None)

Make a log entry with special format string containing: "ELAPSED", a tag, time, and elapsed time (from start).

Parameters:

Name Type Description Default
msg Union[str, None]

Additional message to log.

None
Source code in penvm/src/lib/penvm/lib/misc.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def elapsed(
    self,
    msg: Union[str, None] = None,
):
    """Make a log entry with special format string containing:
    "ELAPSED", a tag, time, and elapsed time (from start).

    Args:
        msg: Additional message to log.
    """
    t1, elapsed = self.mark.elapsed()
    _msg = f"ELAPSED [{self.mark.uuid}, {t1:.5f}, {elapsed:.5f}]"
    if msg:
        _msg = f"{_msg} {msg}"
    self.log(self.default_level, _msg, stacklevel=self.stacklevel)

enter(msg=None, stacklevel=None)

Make a log entry with special format string containing: "ENTER", tag, and time.

Parameters:

Name Type Description Default
msg Union[str, None]

Additional message to log.

None
stacklevel Union[int, None]

Stack level to extract information from.

None
Source code in penvm/src/lib/penvm/lib/misc.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def enter(
    self,
    msg: Union[str, None] = None,
    stacklevel: Union[int, None] = None,
):
    """Make a log entry with special format string containing:
    "ENTER", tag, and time.

    Args:
        msg: Additional message to log.
        stacklevel: Stack level to extract information from.
    """
    _msg = f"ENTER [{self.mark.uuid}, {self.mark.t0:.5f}]"
    if msg:
        _msg = f"{_msg} {msg}"
    stacklevel = stacklevel if stacklevel != None else self.stacklevel
    self.log(self.default_level, _msg, stacklevel=stacklevel)
    return self

error(msg, *args, **kwargs)

Mark an "error" log entry.

Source code in penvm/src/lib/penvm/lib/misc.py
266
267
268
269
def error(self, msg, *args, **kwargs):
    """Mark an "error" log entry."""
    _msg = f"[{self.mark.uuid}] {msg}"
    self.log(logging.ERROR, _msg, *args, stacklevel=self.stacklevel, **kwargs)

exit(msg=None)

Make a log entry with special format string containing: "EXIT", tag, time, and elapsed (from start).

Parameters:

Name Type Description Default
msg Union[str, None]

Additional message to log.

None
Source code in penvm/src/lib/penvm/lib/misc.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def exit(
    self,
    msg: Union[str, None] = None,
):
    """Make a log entry with special format string containing:
    "EXIT", tag, time, and elapsed (from start).

    Args:
        msg: Additional message to log.
    """
    t1, elapsed = self.mark.elapsed()
    _msg = f"EXIT [{self.mark.uuid}, {t1:.5f}, {elapsed:.5f}]"
    if msg:
        _msg = f"{_msg} {msg}"
    self.log(self.default_level, _msg, stacklevel=self.stacklevel)

lap(msg=None)

Make a log entry with special format string containing: "LAP", tag, time, and elapsed from split, and from lap split.

Parameters:

Name Type Description Default
msg Union[str, None]

Additional message to log.

None
Source code in penvm/src/lib/penvm/lib/misc.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def lap(
    self,
    msg: Union[str, None] = None,
):
    """Make a log entry with special format string containing:
    "LAP", tag, time, and elapsed from split, and from lap split.

    Args:
        msg: Additional message to log.
    """
    t1, elapsed, lap = self.mark.lap()
    _msg = f"LAP [{self.mark.uuid}, {t1:.5f}, {elapsed:.5f}, {lap:.5f}]"
    if msg:
        _msg = f"{_msg} {msg}"
    self.log(self.default_level, _msg, stacklevel=self.stacklevel)

warning(msg, *args, **kwargs)

Make a "warning" log entry.

Source code in penvm/src/lib/penvm/lib/misc.py
312
313
314
315
def warning(self, msg, *args, **kwargs):
    """Make a "warning" log entry."""
    _msg = f"[{self.mark.uuid}] {msg}"
    self.log(logging.WARNING, _msg, *args, stacklevel=self.stacklevel, **kwargs)

get_log_mark()

Get log mark of (now, uuid).

Source code in penvm/src/lib/penvm/lib/misc.py
344
345
346
def get_log_mark() -> Tuple[float, str]:
    """Get log mark of (now, uuid)."""
    return (time.time(), get_log_uuid())

get_log_uuid()

Return UUID for logging.

Returns:

Type Description
str

UUID string value.

Source code in penvm/src/lib/penvm/lib/misc.py
323
324
325
326
327
328
329
330
331
332
def get_log_uuid() -> str:
    """Return UUID for logging.

    Returns:
        UUID string value.
    """
    global _log_counter, _log_counter_lock
    with _log_counter_lock:
        _log_counter += 1
        return "%x" % _log_counter

get_log_uuid1()

Return UUID1 log logging.

Returns:

Type Description
str

UUID1 string value.

Source code in penvm/src/lib/penvm/lib/misc.py
335
336
337
338
339
340
341
def get_log_uuid1() -> str:
    """Return UUID1 log logging.

    Returns:
        UUID1 string value.
    """
    return str(uuid1())

get_timestamp()

Get timestamp as string.

Source code in penvm/src/lib/penvm/lib/misc.py
349
350
351
def get_timestamp() -> str:
    """Get timestamp as string."""
    return str(time.time())

get_uuid()

Alternate get_uuid implementation.

Source code in penvm/src/lib/penvm/lib/misc.py
358
359
360
361
362
363
def get_uuid() -> str:
    """Alternate `get_uuid` implementation."""
    global _counter, _counter_lock
    with _counter_lock:
        _counter += 1
        return "%x" % _counter

get_uuid1()

Get UUID1 value.

Returns:

Type Description
str

UUID1 string value.

Source code in penvm/src/lib/penvm/lib/misc.py
366
367
368
369
370
371
372
373
def get_uuid1() -> str:
    """Get UUID1 value.

    Returns:
        UUID1 string value.
    """
    # return uuid1().hex
    return str(uuid1())

get_version()

Return PENVM version tuple.

Returns:

Type Description
Tuple

Version tuple.

Source code in penvm/src/lib/penvm/lib/misc.py
377
378
379
380
381
382
def get_version() -> Tuple:
    """Return PENVM version tuple.

    Returns:
        Version tuple."""
    return VERSION

get_version_string()

Return PENVM version as a string.

Returns:

Type Description
str

Version string.

Source code in penvm/src/lib/penvm/lib/misc.py
385
386
387
388
389
390
391
def get_version_string() -> str:
    """Return PENVM version as a string.

    Returns:
        Version string.
    """
    return "%s.%s.%s" % VERSION

penvm.lib.mqueue

MessageQueue

Bases: BaseObject, Queue

Message queue.

Built on top of penvm.lib.queue.Queue.

Source code in penvm/src/lib/penvm/lib/mqueue.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class MessageQueue(BaseObject, Queue):
    """Message queue.

    Built on top of [penvm.lib.queue.Queue][].
    """

    def __init__(self, qsize: int = 0):
        """Initialize.

        Args:
            qsize: Maximum queue size. 0 for unlimited.
        """
        BaseObject.__init__(self, None, logger)
        Queue.__init__(self, qsize)

    def find(
        self,
        id: str,
    ) -> "Message":
        """Find and return message in queue with given header id.

        Args:
            id: Message header id.

        Returns:
            Matching Message.
        """
        for v in self.values():
            if v.header.get("id") == id:
                return v

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` object.
        """
        try:
            values = []
            for v in self.values():
                if type(v) == Message:
                    values.append(v.state())
                else:
                    values.append(None)
            return State(
                "mqueue",
                self.oid,
                {
                    "frozen": self.frozen,
                    "npop": self.npop,
                    "nput": self.nput,
                    "size": self.qsize,
                    "values": values,
                },
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

__init__(qsize=0)

Initialize.

Parameters:

Name Type Description Default
qsize int

Maximum queue size. 0 for unlimited.

0
Source code in penvm/src/lib/penvm/lib/mqueue.py
39
40
41
42
43
44
45
46
def __init__(self, qsize: int = 0):
    """Initialize.

    Args:
        qsize: Maximum queue size. 0 for unlimited.
    """
    BaseObject.__init__(self, None, logger)
    Queue.__init__(self, qsize)

find(id)

Find and return message in queue with given header id.

Parameters:

Name Type Description Default
id str

Message header id.

required

Returns:

Type Description
Message

Matching Message.

Source code in penvm/src/lib/penvm/lib/mqueue.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def find(
    self,
    id: str,
) -> "Message":
    """Find and return message in queue with given header id.

    Args:
        id: Message header id.

    Returns:
        Matching Message.
    """
    for v in self.values():
        if v.header.get("id") == id:
            return v

state()

Get object state.

Returns:

Type Description
State

State object.

Source code in penvm/src/lib/penvm/lib/mqueue.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` object.
    """
    try:
        values = []
        for v in self.values():
            if type(v) == Message:
                values.append(v.state())
            else:
                values.append(None)
        return State(
            "mqueue",
            self.oid,
            {
                "frozen": self.frozen,
                "npop": self.npop,
                "nput": self.nput,
                "size": self.qsize,
                "values": values,
            },
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

penvm.lib.queue

Queue

Implementation allowing for inspection and peeking.

Built on top of [queue.Queue][].

Source code in penvm/src/lib/penvm/lib/queue.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class Queue:
    """Implementation allowing for inspection and peeking.

    Built on top of [queue.Queue][].
    """

    def __init__(
        self,
        qsize: int = 0,
    ):
        """Initialize.

        Args:
            qsize: Maximum queue size. 0 for unlimited.
        """
        self.frozen = False
        # TODO: support sizing of queue
        self.qsize = qsize = 10
        self._queue = queue.Queue(qsize)
        self._tmp = QueueEmpty
        self.npop = 0
        self.nput = 0

    def clear(self):
        """Clear all queued objects."""
        try:
            while True:
                self._queue.get(block=False)
        except queue.Empty as e:
            pass
        except Exception:
            pass

    def freeze(
        self,
        state: bool,
    ):
        """Allow/disallow additions.

        Args:
            state: New state of queue.
        """
        self.frozen = state

    def get(self) -> Any:
        """Get copy of object from queue.

        Returns:
            Item.
        """
        # TODO: needs some work?
        if self._queue.qsize():
            return self._queue.queue[0].copy()
        return QueueEmpty

    def pop(
        self,
        block: bool = True,
    ) -> Any:
        """Pop object from queue.

        Args:
            block: Wait for object.

        Returns:
            Item.
        """
        try:
            v = self._queue.get(block=block)
            self.npop += 1
            # TODO: what to return if not blocking and no value? None!
        except queue.Empty as e:
            v = None
        return v

    def put(self, o: Any):
        """Put object on queue.

        Args:
            o: Object.
        """
        if not self.frozen:
            self.nput += 1
            self._queue.put(o)
        # TODO: raise exception if frozen

    def size(self) -> int:
        """Return queue size.

        Returns:
            Queue size.
        """
        return self._queue.qsize()

    def values(self) -> List[Any]:
        """Return (actual) queued values.

        Returns:
            Queue values.
        """
        return [v for v in self._queue.queue]

__init__(qsize=0)

Initialize.

Parameters:

Name Type Description Default
qsize int

Maximum queue size. 0 for unlimited.

0
Source code in penvm/src/lib/penvm/lib/queue.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(
    self,
    qsize: int = 0,
):
    """Initialize.

    Args:
        qsize: Maximum queue size. 0 for unlimited.
    """
    self.frozen = False
    # TODO: support sizing of queue
    self.qsize = qsize = 10
    self._queue = queue.Queue(qsize)
    self._tmp = QueueEmpty
    self.npop = 0
    self.nput = 0

clear()

Clear all queued objects.

Source code in penvm/src/lib/penvm/lib/queue.py
52
53
54
55
56
57
58
59
60
def clear(self):
    """Clear all queued objects."""
    try:
        while True:
            self._queue.get(block=False)
    except queue.Empty as e:
        pass
    except Exception:
        pass

freeze(state)

Allow/disallow additions.

Parameters:

Name Type Description Default
state bool

New state of queue.

required
Source code in penvm/src/lib/penvm/lib/queue.py
62
63
64
65
66
67
68
69
70
71
def freeze(
    self,
    state: bool,
):
    """Allow/disallow additions.

    Args:
        state: New state of queue.
    """
    self.frozen = state

get()

Get copy of object from queue.

Returns:

Type Description
Any

Item.

Source code in penvm/src/lib/penvm/lib/queue.py
73
74
75
76
77
78
79
80
81
82
def get(self) -> Any:
    """Get copy of object from queue.

    Returns:
        Item.
    """
    # TODO: needs some work?
    if self._queue.qsize():
        return self._queue.queue[0].copy()
    return QueueEmpty

pop(block=True)

Pop object from queue.

Parameters:

Name Type Description Default
block bool

Wait for object.

True

Returns:

Type Description
Any

Item.

Source code in penvm/src/lib/penvm/lib/queue.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def pop(
    self,
    block: bool = True,
) -> Any:
    """Pop object from queue.

    Args:
        block: Wait for object.

    Returns:
        Item.
    """
    try:
        v = self._queue.get(block=block)
        self.npop += 1
        # TODO: what to return if not blocking and no value? None!
    except queue.Empty as e:
        v = None
    return v

put(o)

Put object on queue.

Parameters:

Name Type Description Default
o Any

Object.

required
Source code in penvm/src/lib/penvm/lib/queue.py
104
105
106
107
108
109
110
111
112
def put(self, o: Any):
    """Put object on queue.

    Args:
        o: Object.
    """
    if not self.frozen:
        self.nput += 1
        self._queue.put(o)

size()

Return queue size.

Returns:

Type Description
int

Queue size.

Source code in penvm/src/lib/penvm/lib/queue.py
115
116
117
118
119
120
121
def size(self) -> int:
    """Return queue size.

    Returns:
        Queue size.
    """
    return self._queue.qsize()

values()

Return (actual) queued values.

Returns:

Type Description
List[Any]

Queue values.

Source code in penvm/src/lib/penvm/lib/queue.py
123
124
125
126
127
128
129
def values(self) -> List[Any]:
    """Return (actual) queued values.

    Returns:
        Queue values.
    """
    return [v for v in self._queue.queue]

RoutingQueue

Bases: Queue

Route queue operations elsewhere.

Three functions are registered to handle the different kinds of queueing (and this routing) operations.

Source code in penvm/src/lib/penvm/lib/queue.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class RoutingQueue(Queue):
    """Route queue operations elsewhere.

    Three functions are registered to handle the different kinds of
    queueing (and this routing) operations.
    """

    def __init__(
        self,
        get: Union[Callable, None] = None,
        pop: Union[Callable, None] = None,
        put: Union[Callable, None] = None,
    ):
        """Initialize.

        Args:
            get: Function to call for `get`.
            pop: Function to call for `pop`.
            put: Function to call for `put`.
        """
        super().__init__()
        self._get = get
        self._pop = pop
        self._put = put

    def get(self) -> Union[Any, None]:
        """Get a copy of an object.

        Returns:
            Copy of an object.
        """
        return None if self._get == None else self._get()

    def pop(
        self,
        block: bool = True,
    ) -> Union[Any, None]:
        """Pop an object.

        Returns:
            An object.
        """
        return None if self._pop == None else self._pop(block)

    def put(
        self,
        o: Any,
    ):
        """Put an object.

        Args:
            o: Object to queue/route.
        """
        self._put(o)

    def size(self) -> int:
        """Queue size is always 0.

        Returns:
            0
        """
        return 0

    def values(self) -> List:
        """Never any values on queue.

        Returns:
            Empty list."""
        return []

__init__(get=None, pop=None, put=None)

Initialize.

Parameters:

Name Type Description Default
get Union[Callable, None]

Function to call for get.

None
pop Union[Callable, None]

Function to call for pop.

None
put Union[Callable, None]

Function to call for put.

None
Source code in penvm/src/lib/penvm/lib/queue.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def __init__(
    self,
    get: Union[Callable, None] = None,
    pop: Union[Callable, None] = None,
    put: Union[Callable, None] = None,
):
    """Initialize.

    Args:
        get: Function to call for `get`.
        pop: Function to call for `pop`.
        put: Function to call for `put`.
    """
    super().__init__()
    self._get = get
    self._pop = pop
    self._put = put

get()

Get a copy of an object.

Returns:

Type Description
Union[Any, None]

Copy of an object.

Source code in penvm/src/lib/penvm/lib/queue.py
157
158
159
160
161
162
163
def get(self) -> Union[Any, None]:
    """Get a copy of an object.

    Returns:
        Copy of an object.
    """
    return None if self._get == None else self._get()

pop(block=True)

Pop an object.

Returns:

Type Description
Union[Any, None]

An object.

Source code in penvm/src/lib/penvm/lib/queue.py
165
166
167
168
169
170
171
172
173
174
def pop(
    self,
    block: bool = True,
) -> Union[Any, None]:
    """Pop an object.

    Returns:
        An object.
    """
    return None if self._pop == None else self._pop(block)

put(o)

Put an object.

Parameters:

Name Type Description Default
o Any

Object to queue/route.

required
Source code in penvm/src/lib/penvm/lib/queue.py
176
177
178
179
180
181
182
183
184
185
def put(
    self,
    o: Any,
):
    """Put an object.

    Args:
        o: Object to queue/route.
    """
    self._put(o)

size()

Queue size is always 0.

Returns:

Type Description
int

0

Source code in penvm/src/lib/penvm/lib/queue.py
187
188
189
190
191
192
193
def size(self) -> int:
    """Queue size is always 0.

    Returns:
        0
    """
    return 0

values()

Never any values on queue.

Returns:

Type Description
List

Empty list.

Source code in penvm/src/lib/penvm/lib/queue.py
195
196
197
198
199
200
def values(self) -> List:
    """Never any values on queue.

    Returns:
        Empty list."""
    return []

penvm.lib.semaphore

AdjustableSemaphore

Semaphore with support for adjusting the limit n while in use.

Source code in penvm/src/lib/penvm/lib/semaphore.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class AdjustableSemaphore:
    """Semaphore with support for adjusting the limit `n` while in
    use."""

    def __init__(
        self,
        n: int = 1,
    ):
        """Initialize.

        Args:
            n: Count.
        """
        self._max = n

        self._curr = 0

        self._lock = Lock()
        self._waitlock = Lock()

    def acquire(self):
        """Acquire a semaphore.

        Increase current count.
        """
        while True:
            if self._lock.acquire(blocking=False):
                if self._curr < self._max:
                    self._curr += 1
                    self._lock.release()
                    return
                self._lock.release()
            self._waitlock.acquire()

    def adjust(self, n: int = 1):
        """Adjust count.

        Args:
            n: New count.
        """
        self._lock.acquire()
        n = n if n > 0 else 1
        self._max = n
        self._lock.release()
        # wake up waiter that might *now* acquire a lock
        try:
            self._waitlock.release()
        except:
            # ignore if already unlocked!
            pass

    def count(self) -> int:
        """Return current count.

        Returns:
            Current count.
        """
        return self._curr

    def max(self) -> int:
        """Return maximum count allowed.

        Returns:
            Maximum count.
        """
        return self._max

    def release(self):
        """Release a semaphore.

        Decreases current count.
        """
        self._lock.acquire()
        self._curr = max(self._curr - 1, 0)
        self._lock.release()

        try:
            # wake up waiters
            self._waitlock.release()
        except Exception as e:
            pass

__init__(n=1)

Initialize.

Parameters:

Name Type Description Default
n int

Count.

1
Source code in penvm/src/lib/penvm/lib/semaphore.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    n: int = 1,
):
    """Initialize.

    Args:
        n: Count.
    """
    self._max = n

    self._curr = 0

    self._lock = Lock()
    self._waitlock = Lock()

acquire()

Acquire a semaphore.

Increase current count.

Source code in penvm/src/lib/penvm/lib/semaphore.py
49
50
51
52
53
54
55
56
57
58
59
60
61
def acquire(self):
    """Acquire a semaphore.

    Increase current count.
    """
    while True:
        if self._lock.acquire(blocking=False):
            if self._curr < self._max:
                self._curr += 1
                self._lock.release()
                return
            self._lock.release()
        self._waitlock.acquire()

adjust(n=1)

Adjust count.

Parameters:

Name Type Description Default
n int

New count.

1
Source code in penvm/src/lib/penvm/lib/semaphore.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def adjust(self, n: int = 1):
    """Adjust count.

    Args:
        n: New count.
    """
    self._lock.acquire()
    n = n if n > 0 else 1
    self._max = n
    self._lock.release()
    # wake up waiter that might *now* acquire a lock
    try:
        self._waitlock.release()
    except:
        # ignore if already unlocked!
        pass

count()

Return current count.

Returns:

Type Description
int

Current count.

Source code in penvm/src/lib/penvm/lib/semaphore.py
80
81
82
83
84
85
86
def count(self) -> int:
    """Return current count.

    Returns:
        Current count.
    """
    return self._curr

max()

Return maximum count allowed.

Returns:

Type Description
int

Maximum count.

Source code in penvm/src/lib/penvm/lib/semaphore.py
88
89
90
91
92
93
94
def max(self) -> int:
    """Return maximum count allowed.

    Returns:
        Maximum count.
    """
    return self._max

release()

Release a semaphore.

Decreases current count.

Source code in penvm/src/lib/penvm/lib/semaphore.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def release(self):
    """Release a semaphore.

    Decreases current count.
    """
    self._lock.acquire()
    self._curr = max(self._curr - 1, 0)
    self._lock.release()

    try:
        # wake up waiters
        self._waitlock.release()
    except Exception as e:
        pass

penvm.lib.thread

Provides modified Thread to support thread termination.

Thread

Bases: BaseObject, threading.Thread

Thread with specific settings and functionality: * Will die on main thread exit. * Support for exception to thread. * Terminatable.

See https://code.activestate.com/recipes/496960-thread2-killable-threads/.

Source code in penvm/src/lib/penvm/lib/thread.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class Thread(BaseObject, threading.Thread):
    """Thread with specific settings and functionality:
    * Will die on main thread exit.
    * Support for exception to thread.
    * Terminatable.

    See https://code.activestate.com/recipes/496960-thread2-killable-threads/.
    """

    def __init__(
        self,
        *args: List,
        **kwargs: Dict,
    ):
        """Initialize.

        See [theading.Thread][].
        """
        try:
            BaseObject.__init__(self, None, logger)
            threading.Thread.__init__(self, *args, **kwargs)
            self.oid = self.name
            self.daemon = True
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            pass

    def get_id(self) -> int:
        """Return thread id.

        Returns:
            Thread id.
        """
        if hasattr(self, "_thread_id"):
            return self._thread_id

        for id, thread in threading._active.items():
            if thread is self:
                return id

    def raise_exception(self, exc: Exception):
        """Raise a specific exception.

        Args:
            exc: Exception to raise.
        """
        self.logger.debug("raising exception ({exc}) ...")

        # TODO: should this be repeated until it is effective?

        threadid = self.get_id()
        if threadid != None:
            res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
                ctypes.c_long(threadid),
                ctypes.py_object(exc),
            )
            self.logger.info("raise exception result ({res})")
            if res > 1:
                res2 = ctypes.pythonapi.PyThreadState_SetAsyncExc(
                    ctypes.c_long(threadid),
                    0,
                )
                # exception raise failure

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` object.
        """
        try:
            return State(
                "thread",
                self.oid,
                {
                    "args": [str(arg) for arg in self._args],
                    "kwargs": [str(arg) for arg in self._kwargs.items()],
                    "name": self._name,
                    "native-id": self.native_id,
                    "target": str(self._target),
                },
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

    def terminate(self):
        """Terminate. Raises exception."""
        # TODO: will this ultimately work, e.g., even when returning
        # from C code/extension? should it repeat until effective?
        self.logger.debug("terminating ...")
        self.raise_exception(ThreadInterrupt)

__init__(*args, **kwargs)

Initialize.

See [theading.Thread][].

Source code in penvm/src/lib/penvm/lib/thread.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __init__(
    self,
    *args: List,
    **kwargs: Dict,
):
    """Initialize.

    See [theading.Thread][].
    """
    try:
        BaseObject.__init__(self, None, logger)
        threading.Thread.__init__(self, *args, **kwargs)
        self.oid = self.name
        self.daemon = True
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        pass

get_id()

Return thread id.

Returns:

Type Description
int

Thread id.

Source code in penvm/src/lib/penvm/lib/thread.py
70
71
72
73
74
75
76
77
78
79
80
81
def get_id(self) -> int:
    """Return thread id.

    Returns:
        Thread id.
    """
    if hasattr(self, "_thread_id"):
        return self._thread_id

    for id, thread in threading._active.items():
        if thread is self:
            return id

raise_exception(exc)

Raise a specific exception.

Parameters:

Name Type Description Default
exc Exception

Exception to raise.

required
Source code in penvm/src/lib/penvm/lib/thread.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def raise_exception(self, exc: Exception):
    """Raise a specific exception.

    Args:
        exc: Exception to raise.
    """
    self.logger.debug("raising exception ({exc}) ...")

    # TODO: should this be repeated until it is effective?

    threadid = self.get_id()
    if threadid != None:
        res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
            ctypes.c_long(threadid),
            ctypes.py_object(exc),
        )
        self.logger.info("raise exception result ({res})")
        if res > 1:
            res2 = ctypes.pythonapi.PyThreadState_SetAsyncExc(
                ctypes.c_long(threadid),
                0,
            )

state()

Get object state.

Returns:

Type Description
State

State object.

Source code in penvm/src/lib/penvm/lib/thread.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` object.
    """
    try:
        return State(
            "thread",
            self.oid,
            {
                "args": [str(arg) for arg in self._args],
                "kwargs": [str(arg) for arg in self._kwargs.items()],
                "name": self._name,
                "native-id": self.native_id,
                "target": str(self._target),
            },
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

terminate()

Terminate. Raises exception.

Source code in penvm/src/lib/penvm/lib/thread.py
128
129
130
131
132
133
def terminate(self):
    """Terminate. Raises exception."""
    # TODO: will this ultimately work, e.g., even when returning
    # from C code/extension? should it repeat until effective?
    self.logger.debug("terminating ...")
    self.raise_exception(ThreadInterrupt)

ThreadInterrupt

Bases: Exception

Special exception for augmented Thread.

Source code in penvm/src/lib/penvm/lib/thread.py
34
35
36
37
38
39
class ThreadInterrupt(Exception):
    """Special exception for augmented Thread."""

    def __init__(self):
        """Initialize."""
        super().__init__("interrupted thread")

__init__()

Initialize.

Source code in penvm/src/lib/penvm/lib/thread.py
37
38
39
def __init__(self):
    """Initialize."""
    super().__init__("interrupted thread")
You are on penvm.dev