Skip to content

Ext

penvm.ext.workers

A Worker is a convenience tool to help parcel out the work to the network of machines. The steps are:

  1. partition the problem space
  2. spawn the work to the network
  3. collect the results
  4. process the results
  5. combine (consolidate) the results into one
  6. return the final result

Workers may be tailored to suit specific needs. This is espectially required for the paritition and combine steps, where both implementations cannot be known in advance.

ExecWorker

Bases: Worker

Worker to call executable.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
class ExecWorker(Worker):
    """Worker to call executable."""

    def __init__(
        self,
        cwd: str,
        network: "Network",
        nworkers: Union[int, None] = None,
        auto_collect: bool = True,
        text: bool = True,
        env: dict = None,
    ):
        """Initialize.

        Args:
            cwd: Working directory to use.
            network: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
            nworkers: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
            auto_collect: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
            text: Work in text mode.
            env: Environment variable settings.
        """
        super().__init__(network, nworkers, auto_collect)
        self.cwd = cwd
        self.text = text
        self.env = env

    def call_one(
        self,
        idx: int,
        session: "Session",
        *args: List,
        **kwargs: Dict,
    ):
        """Call one.

        Args:
            args (str): `[0]`: Command path.
            args (List[Any]): `[1:]`: Command arguments.

        See [Worker.call_one][penvm.ext.workers.Worker.call_one].
        """
        try:
            tlogger = self.logger.enter()

            session.kernel.run_exec(
                args[0],
                args[1:],
                capture_output=True,
                text=self.text,
                cwd=self.cwd,
                env=self.env,
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def collect_one(
        self,
        idx: int,
        session: "Session",
    ) -> Dict:
        """Collect one result.

        Extract information from the payload:

        * `status`: Response status.
        * `returncode`: Return code from execution.
        * `stderr`: Stderr output.
        * `stdout`: Stdout output.

        See [Worker.collect_one][penvm.ext.workers.Worker.collect_one].
        """
        try:
            tlogger = self.logger.enter()

            resp = session.get_response()
            payload = resp.payload
            return {
                "status": payload.get("-status"),
                "returncode": payload.get("returncode"),
                "stderr": payload.get("stderr", ""),
                "stdout": payload.get("stdout", ""),
            }

        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

__init__(cwd, network, nworkers=None, auto_collect=True, text=True, env=None)

Initialize.

Parameters:

Name Type Description Default
cwd str

Working directory to use.

required
network Network

See Worker.init.

required
nworkers Union[int, None]

See Worker.init.

None
auto_collect bool

See Worker.init.

True
text bool

Work in text mode.

True
env dict

Environment variable settings.

None
Source code in penvm/src/ext/penvm/ext/workers/__init__.py
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
def __init__(
    self,
    cwd: str,
    network: "Network",
    nworkers: Union[int, None] = None,
    auto_collect: bool = True,
    text: bool = True,
    env: dict = None,
):
    """Initialize.

    Args:
        cwd: Working directory to use.
        network: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
        nworkers: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
        auto_collect: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
        text: Work in text mode.
        env: Environment variable settings.
    """
    super().__init__(network, nworkers, auto_collect)
    self.cwd = cwd
    self.text = text
    self.env = env

call_one(idx, session, *args, **kwargs)

Call one.

Parameters:

Name Type Description Default
args str

[0]: Command path.

()
args List[Any]

[1:]: Command arguments.

()

See Worker.call_one.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
def call_one(
    self,
    idx: int,
    session: "Session",
    *args: List,
    **kwargs: Dict,
):
    """Call one.

    Args:
        args (str): `[0]`: Command path.
        args (List[Any]): `[1:]`: Command arguments.

    See [Worker.call_one][penvm.ext.workers.Worker.call_one].
    """
    try:
        tlogger = self.logger.enter()

        session.kernel.run_exec(
            args[0],
            args[1:],
            capture_output=True,
            text=self.text,
            cwd=self.cwd,
            env=self.env,
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

collect_one(idx, session)

Collect one result.

Extract information from the payload:

  • status: Response status.
  • returncode: Return code from execution.
  • stderr: Stderr output.
  • stdout: Stdout output.

See Worker.collect_one.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
def collect_one(
    self,
    idx: int,
    session: "Session",
) -> Dict:
    """Collect one result.

    Extract information from the payload:

    * `status`: Response status.
    * `returncode`: Return code from execution.
    * `stderr`: Stderr output.
    * `stdout`: Stdout output.

    See [Worker.collect_one][penvm.ext.workers.Worker.collect_one].
    """
    try:
        tlogger = self.logger.enter()

        resp = session.get_response()
        payload = resp.payload
        return {
            "status": payload.get("-status"),
            "returncode": payload.get("returncode"),
            "stderr": payload.get("stderr", ""),
            "stdout": payload.get("stdout", ""),
        }

    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

InputExecWorker

Bases: InputMixin, ExecWorker

Combined penvm.ext.workers.InputMixin and penvm.ext.workers.ExecWorker.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
835
836
837
838
839
class InputExecWorker(InputMixin, ExecWorker):
    """Combined [penvm.ext.workers.InputMixin][] and
    [penvm.ext.workers.ExecWorker][]."""

    pass

InputMixin

Mixin to partition the problem space into an input-defined number of partitions.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
class InputMixin:
    """Mixin to partition the problem space into an input-defined
    number of partitions.
    """

    def combine(
        self,
        results: List[Any],
    ) -> Dict[int, Any]:
        """Return dictionary with results given under respective
        partition index.

        Args:
            results: Collected results.

        Returns:
            Combined results.
        """
        d = {}
        for i, result in enumerate(results):
            d[i] = result
        return d

    def partition(
        self,
        *args: List,
        **kwargs: Dict,
    ) -> Tuple[List, Dict]:
        """Partition input stream.

        Keyword Args:
            `_` (dict): Input directives.
            `*` (Any): All others for the call.

        Keyword Args: Keyword Input Directives:
            `fieldsname` (str): Name of fields list referenced with
                `transform`.Defaults to "f".
            `file` (file): Input stream.
            `fsep` (str): Field separator. Defaults to whitespace.
            `data` (str|bytes): Input data.
            `rsep` (str): Record separator. Defaults to newline.
            `striprsep` (bool): Strip record separator. Defaults to
                `False`.
            `transform` (str): Python code to transform data fields
                (accessible via list named `fieldsname`). Defaults to
                identity.

        Yields:
            `args` partitioned according to directives.
        """
        idirectives = kwargs.get("_", {})
        fieldsname = idirectives.get("fieldsname", "f")
        file = idirectives.get("file")
        fsep = idirectives.get("fsep", None)
        data = idirectives.get("data")
        rsep = idirectives.get("rsep", "\n")
        striprsep = idirectives.get("striprsep", False)
        transformstr = idirectives.get("transform", None)

        if transformstr:
            try:
                transformcode = compile(transformstr, filename="transform", mode="exec")
            except Exception as e:
                raise Exception(f"transform string compilation failed ({e})")
        else:
            transformcode = None

        # TODO: handle text and bytes lsep and fsep defaults
        if data != None:
            if type(data) == str:
                f = io.StringIO(data)
            else:
                f = io.BytesIO(data)
                if type(rsep) != bytes:
                    rsep = rsep.encode()
        elif file != None:
            f = file
        else:
            raise Exception("no input stream/data provided")

        while True:
            # TODO: update to support non-newline line separator
            rec = f.readline()
            if rec == "":
                break
            if striprsep == True:
                rec = rec.rstrip(rsep)
            fields = rec.split(fsep)
            if transformcode:
                import os.path

                exec(transformcode, {fieldsname: fields, "os": os})
            _args = [arg.format(*fields) for arg in args]
            yield (_args, {})

combine(results)

Return dictionary with results given under respective partition index.

Parameters:

Name Type Description Default
results List[Any]

Collected results.

required

Returns:

Type Description
Dict[int, Any]

Combined results.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
def combine(
    self,
    results: List[Any],
) -> Dict[int, Any]:
    """Return dictionary with results given under respective
    partition index.

    Args:
        results: Collected results.

    Returns:
        Combined results.
    """
    d = {}
    for i, result in enumerate(results):
        d[i] = result
    return d

partition(*args, **kwargs)

Partition input stream.

Other Parameters:

Name Type Description
`_` dict

Input directives.

`*` Any

All others for the call.

Keyword Input Directives:

Name Type Description
`fieldsname` str

Name of fields list referenced with transform.Defaults to "f".

`file` file

Input stream.

`fsep` str

Field separator. Defaults to whitespace.

`data` str | bytes

Input data.

`rsep` str

Record separator. Defaults to newline.

`striprsep` bool

Strip record separator. Defaults to False.

`transform` str

Python code to transform data fields (accessible via list named fieldsname). Defaults to identity.

Yields:

Type Description
Tuple[List, Dict]

args partitioned according to directives.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def partition(
    self,
    *args: List,
    **kwargs: Dict,
) -> Tuple[List, Dict]:
    """Partition input stream.

    Keyword Args:
        `_` (dict): Input directives.
        `*` (Any): All others for the call.

    Keyword Args: Keyword Input Directives:
        `fieldsname` (str): Name of fields list referenced with
            `transform`.Defaults to "f".
        `file` (file): Input stream.
        `fsep` (str): Field separator. Defaults to whitespace.
        `data` (str|bytes): Input data.
        `rsep` (str): Record separator. Defaults to newline.
        `striprsep` (bool): Strip record separator. Defaults to
            `False`.
        `transform` (str): Python code to transform data fields
            (accessible via list named `fieldsname`). Defaults to
            identity.

    Yields:
        `args` partitioned according to directives.
    """
    idirectives = kwargs.get("_", {})
    fieldsname = idirectives.get("fieldsname", "f")
    file = idirectives.get("file")
    fsep = idirectives.get("fsep", None)
    data = idirectives.get("data")
    rsep = idirectives.get("rsep", "\n")
    striprsep = idirectives.get("striprsep", False)
    transformstr = idirectives.get("transform", None)

    if transformstr:
        try:
            transformcode = compile(transformstr, filename="transform", mode="exec")
        except Exception as e:
            raise Exception(f"transform string compilation failed ({e})")
    else:
        transformcode = None

    # TODO: handle text and bytes lsep and fsep defaults
    if data != None:
        if type(data) == str:
            f = io.StringIO(data)
        else:
            f = io.BytesIO(data)
            if type(rsep) != bytes:
                rsep = rsep.encode()
    elif file != None:
        f = file
    else:
        raise Exception("no input stream/data provided")

    while True:
        # TODO: update to support non-newline line separator
        rec = f.readline()
        if rec == "":
            break
        if striprsep == True:
            rec = rec.rstrip(rsep)
        fields = rec.split(fsep)
        if transformcode:
            import os.path

            exec(transformcode, {fieldsname: fields, "os": os})
        _args = [arg.format(*fields) for arg in args]
        yield (_args, {})

InputPythonCodeWorker

Bases: InputMixin, PythonCodeWorker

Combined penvm.ext.workers.InputMixin and penvm.ext.workers.PythonCodeWorker.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
842
843
844
845
846
class InputPythonCodeWorker(InputMixin, PythonCodeWorker):
    """Combined [penvm.ext.workers.InputMixin][] and
    [penvm.ext.workers.PythonCodeWorker][]."""

    pass

MirrorExecWorker

Bases: MirrorMixin, ExecWorker

Combined penvm.ext.workers.MirrorMixin and penvm.ext.workers.ExecWorker.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
856
857
858
859
860
class MirrorExecWorker(MirrorMixin, ExecWorker):
    """Combined [penvm.ext.workers.MirrorMixin][] and
    [penvm.ext.workers.ExecWorker][]."""

    pass

MirrorMixin

Mixin to provide mirror support for Worker.

All workers are given the same parameters to use (see partition).

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
class MirrorMixin:
    """Mixin to provide mirror support for Worker.

    *All* workers are given the same parameters to use (see
    `partition`).
    """

    def combine(
        self,
        results: List[Any],
    ) -> Dict[str, Any]:
        """Return dictionary with results given under respective
        machine id keys.

        Args:
            results: Collected results.

        Returns:
            Combined results.
        """
        machines = self.machines
        d = {}
        for i, result in enumerate(results):
            d[machines[i].oid] = result
        return d

    def partition(
        self,
        *args: List,
        **kwargs: Dict,
    ):
        """Same args and kwargs (untouched) for each.

        Args:
            *args: Positional arguments.
            **kwargs: Keyword arguments.

        Yields:
            (Tuple[List, Dict]): `args` and `kwargs` unchanged.
        """
        for idx, machine in enumerate(self.machines):
            yield (args, kwargs)

combine(results)

Return dictionary with results given under respective machine id keys.

Parameters:

Name Type Description Default
results List[Any]

Collected results.

required

Returns:

Type Description
Dict[str, Any]

Combined results.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def combine(
    self,
    results: List[Any],
) -> Dict[str, Any]:
    """Return dictionary with results given under respective
    machine id keys.

    Args:
        results: Collected results.

    Returns:
        Combined results.
    """
    machines = self.machines
    d = {}
    for i, result in enumerate(results):
        d[machines[i].oid] = result
    return d

partition(*args, **kwargs)

Same args and kwargs (untouched) for each.

Parameters:

Name Type Description Default
*args List

Positional arguments.

()
**kwargs Dict

Keyword arguments.

{}

Yields:

Type Description
Tuple[List, Dict]

args and kwargs unchanged.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
def partition(
    self,
    *args: List,
    **kwargs: Dict,
):
    """Same args and kwargs (untouched) for each.

    Args:
        *args: Positional arguments.
        **kwargs: Keyword arguments.

    Yields:
        (Tuple[List, Dict]): `args` and `kwargs` unchanged.
    """
    for idx, machine in enumerate(self.machines):
        yield (args, kwargs)

MirrorOpWorker

Bases: MirrorMixin, OpWorker

Combined penvm.ext.workers.MirrorMixin and penvm.ext.workers.OpWorker.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
849
850
851
852
853
class MirrorOpWorker(MirrorMixin, OpWorker):
    """Combined [penvm.ext.workers.MirrorMixin][] and
    [penvm.ext.workers.OpWorker][]."""

    pass

MirrorPythonCodeWorker

Bases: MirrorMixin, PythonCodeWorker

Combined penvm.ext.workers.MirrorMixin and penvm.ext.workers.PythonCodeWorker.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
863
864
865
866
867
class MirrorPythonCodeWorker(MirrorMixin, PythonCodeWorker):
    """Combined [penvm.ext.workers.MirrorMixin][] and
    [penvm.ext.workers.PythonCodeWorker][]."""

    pass

OpWorker

Bases: Worker

Worker to call an op.

Note

auto_collect is False by default because of the nature of operations (many do not send responses.)

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
class OpWorker(Worker):
    """Worker to call an op.

    Note:
        `auto_collect` is `False` by default because of the
        nature of operations (many do *not* send responses.)
    """

    def __init__(
        self,
        kernelname: str,
        network: "Network",
        nworkers: Union[int, None] = None,
        auto_collect: bool = False,
    ):
        """Initialize.

        Args:
            kernelname: Name of kernel to use.
            network: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
            nworkers: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
            auto_collect: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
        """
        super().__init__(network, nworkers, auto_collect)
        self.kernelname = kernelname

    def call_one(
        self,
        idx: int,
        session: "Session",
        *args: List,
        **kwargs: Dict,
    ) -> Any:
        """Call one.

        Args:
            args (str): `[0]`: Kernel client op method name.
            args (List[Any]): `[1:]`: Arguments for the method call.

        See [Worker.call_one][penvm.ext.workers.Worker.call_one].
        """
        try:
            tlogger = self.logger.enter()

            opname = args[0]
            fn = getattr(session.kernel, opname)
            fn(*args[1:], **kwargs)
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def collect_one(
        self,
        idx: int,
        session: "Session",
    ) -> Dict:
        """Collect one result.

        See [Worker.collect_one][penvm.ext.workers.Worker.collect_one].
        """
        try:
            tlogger = self.logger.enter()
            resp = session.get_response()
            payload = resp.payload
            return payload.dict()
        except Exception as e:
            tlogger.error(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

__init__(kernelname, network, nworkers=None, auto_collect=False)

Initialize.

Parameters:

Name Type Description Default
kernelname str

Name of kernel to use.

required
network Network

See Worker.init.

required
nworkers Union[int, None]

See Worker.init.

None
auto_collect bool

See Worker.init.

False
Source code in penvm/src/ext/penvm/ext/workers/__init__.py
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
def __init__(
    self,
    kernelname: str,
    network: "Network",
    nworkers: Union[int, None] = None,
    auto_collect: bool = False,
):
    """Initialize.

    Args:
        kernelname: Name of kernel to use.
        network: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
        nworkers: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
        auto_collect: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
    """
    super().__init__(network, nworkers, auto_collect)
    self.kernelname = kernelname

call_one(idx, session, *args, **kwargs)

Call one.

Parameters:

Name Type Description Default
args str

[0]: Kernel client op method name.

()
args List[Any]

[1:]: Arguments for the method call.

()

See Worker.call_one.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
def call_one(
    self,
    idx: int,
    session: "Session",
    *args: List,
    **kwargs: Dict,
) -> Any:
    """Call one.

    Args:
        args (str): `[0]`: Kernel client op method name.
        args (List[Any]): `[1:]`: Arguments for the method call.

    See [Worker.call_one][penvm.ext.workers.Worker.call_one].
    """
    try:
        tlogger = self.logger.enter()

        opname = args[0]
        fn = getattr(session.kernel, opname)
        fn(*args[1:], **kwargs)
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

collect_one(idx, session)

Collect one result.

See Worker.collect_one.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
def collect_one(
    self,
    idx: int,
    session: "Session",
) -> Dict:
    """Collect one result.

    See [Worker.collect_one][penvm.ext.workers.Worker.collect_one].
    """
    try:
        tlogger = self.logger.enter()
        resp = session.get_response()
        payload = resp.payload
        return payload.dict()
    except Exception as e:
        tlogger.error(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

PartitionByFunctionWorker

Bases: Worker

No partitioning is done, but the args is extended with nworkers and index. This is suitable for cases in which the receiving function does the partition based on nworkers and the index.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
class PartitionByFunctionWorker(Worker):
    """No partitioning is done, but the `args` is extended with
    `nworkers` and `index`. This is suitable for cases in which the
    receiving function does the partition based on `nworkers` and the
    `index`.
    """

    def partition(
        self,
        *args: List,
        **kwargs: Dict,
    ):
        nworkers = self.nworkers if self.nworkers != None else len(self.machines)
        for i in range(nworkers):
            yield (list(args) + [self.nworkers, i], kwargs)

PythonCodeWorker

Bases: Worker

Worker to call Python code.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
class PythonCodeWorker(Worker):
    """Worker to call Python code."""

    def __init__(
        self,
        fallback: str,
        code: str,
        network: "Network",
        nworkers: Union[int, None] = None,
        auto_collect: bool = True,
    ):
        """Initialize.

        Args:
            fallback: Fallback function/method to call.
            code: Python code to run.
            network: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
            nworkers: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
            auto_collect: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
        """
        super().__init__(network, nworkers, auto_collect)
        self.fallback = fallback
        self.code = code

    def call_one(
        self,
        idx: int,
        session: "Session",
        *args: List,
        **kwargs: Dict,
    ) -> Any:
        """Call one.

        Args:
            args (str): `[0]`: Function name to call in provided code.<br>
            args (List[Any]): `[1:]`: Arguments to call function with.
            kwargs: Keyword arguments to call function with.

        See [Worker.call_one][penvm.ext.workers.Worker.call_one].
        """
        try:
            tlogger = self.logger.enter()
            session.kernel.run_python_function(
                self.code,
                args[0],
                args[1:],
                kwargs,
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def collect_one(
        self,
        idx: int,
        session: "Session",
    ) -> Any:
        """Collect one result.

        Extract information from the payload:

        * `-status`: Execution status.
        * `return-value`: Function return value.

        See [Worker.collect_one][penvm.ext.workers.Worker.collect_one].
        """
        try:
            tlogger = self.logger.enter()

            resp = session.get_response()
            payload = resp.payload
            if payload.get("-status") == "error":
                # TODO: what to do?
                pass
            return payload.get("return-value")
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def run(
        self,
        *args: List,
        **kwargs: Dict,
    ) -> Any:
        """Run fallback or worker run.

        See [Worker.run][penvm.ext.workers.Worker.run].
        """
        try:
            tlogger = self.logger.enter()

            if self.use_fallback(*args, **kwargs) or not self.enabled:
                return self.fallback(*args, **kwargs)
            else:
                return super().run(*args, **kwargs)
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def use_fallback(
        self,
        *args: List,
        **kwargs: Dict,
    ):
        """Call fallback by default, if available."""
        return self.fallback != None

    def wrun(
        self,
        fnname: str,
    ) -> Callable:
        """Wrap run call to provide `fnname` with `args` and `kwargs`.

        This allows for the function name to *not* have to be in the
        `args` and better mirror what may have been the original call
        signature.

        Args:
            fnname: Function name to call.

        Returns:
            Wrapped function call.
        """

        def _run(*args, **kwargs) -> Any:
            return self.run(fnname, *args, **kwargs)

        return _run

__init__(fallback, code, network, nworkers=None, auto_collect=True)

Initialize.

Parameters:

Name Type Description Default
fallback str

Fallback function/method to call.

required
code str

Python code to run.

required
network Network

See Worker.init.

required
nworkers Union[int, None]

See Worker.init.

None
auto_collect bool

See Worker.init.

True
Source code in penvm/src/ext/penvm/ext/workers/__init__.py
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
def __init__(
    self,
    fallback: str,
    code: str,
    network: "Network",
    nworkers: Union[int, None] = None,
    auto_collect: bool = True,
):
    """Initialize.

    Args:
        fallback: Fallback function/method to call.
        code: Python code to run.
        network: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
        nworkers: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
        auto_collect: See [Worker.__init__][penvm.ext.workers.Worker.__init__].
    """
    super().__init__(network, nworkers, auto_collect)
    self.fallback = fallback
    self.code = code

call_one(idx, session, *args, **kwargs)

Call one.

Parameters:

Name Type Description Default
args str

[0]: Function name to call in provided code.

()
args List[Any]

[1:]: Arguments to call function with.

()
kwargs Dict

Keyword arguments to call function with.

{}

See Worker.call_one.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
def call_one(
    self,
    idx: int,
    session: "Session",
    *args: List,
    **kwargs: Dict,
) -> Any:
    """Call one.

    Args:
        args (str): `[0]`: Function name to call in provided code.<br>
        args (List[Any]): `[1:]`: Arguments to call function with.
        kwargs: Keyword arguments to call function with.

    See [Worker.call_one][penvm.ext.workers.Worker.call_one].
    """
    try:
        tlogger = self.logger.enter()
        session.kernel.run_python_function(
            self.code,
            args[0],
            args[1:],
            kwargs,
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

collect_one(idx, session)

Collect one result.

Extract information from the payload:

  • -status: Execution status.
  • return-value: Function return value.

See Worker.collect_one.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
def collect_one(
    self,
    idx: int,
    session: "Session",
) -> Any:
    """Collect one result.

    Extract information from the payload:

    * `-status`: Execution status.
    * `return-value`: Function return value.

    See [Worker.collect_one][penvm.ext.workers.Worker.collect_one].
    """
    try:
        tlogger = self.logger.enter()

        resp = session.get_response()
        payload = resp.payload
        if payload.get("-status") == "error":
            # TODO: what to do?
            pass
        return payload.get("return-value")
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

run(*args, **kwargs)

Run fallback or worker run.

See Worker.run.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
def run(
    self,
    *args: List,
    **kwargs: Dict,
) -> Any:
    """Run fallback or worker run.

    See [Worker.run][penvm.ext.workers.Worker.run].
    """
    try:
        tlogger = self.logger.enter()

        if self.use_fallback(*args, **kwargs) or not self.enabled:
            return self.fallback(*args, **kwargs)
        else:
            return super().run(*args, **kwargs)
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

use_fallback(*args, **kwargs)

Call fallback by default, if available.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
804
805
806
807
808
809
810
def use_fallback(
    self,
    *args: List,
    **kwargs: Dict,
):
    """Call fallback by default, if available."""
    return self.fallback != None

wrun(fnname)

Wrap run call to provide fnname with args and kwargs.

This allows for the function name to not have to be in the args and better mirror what may have been the original call signature.

Parameters:

Name Type Description Default
fnname str

Function name to call.

required

Returns:

Type Description
Callable

Wrapped function call.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
def wrun(
    self,
    fnname: str,
) -> Callable:
    """Wrap run call to provide `fnname` with `args` and `kwargs`.

    This allows for the function name to *not* have to be in the
    `args` and better mirror what may have been the original call
    signature.

    Args:
        fnname: Function name to call.

    Returns:
        Wrapped function call.
    """

    def _run(*args, **kwargs) -> Any:
        return self.run(fnname, *args, **kwargs)

    return _run

Worker

Bases: BaseObject

Base, mostly abstract, class.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
class Worker(BaseObject):
    """Base, mostly abstract, class."""

    def __init__(
        self,
        network: "Network",
        nworkers: Union[int, None] = None,
        auto_collect: bool = True,
    ):
        """Initialize.

        Args:
            network: Network object.
            nworkers: (Maximum) Number of workers to run with.
            auto_collect: Collect response objects."""
        try:
            super().__init__(None, logger)
            tlogger = self.logger.enter()

            self.network = network
            self._nworkers = nworkers
            self.enabled = True
            self.auto_collect = auto_collect
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def call_one(
        self,
        idx: int,
        session: "Session",
        *args: List,
        **kwargs: Dict,
    ):
        """Call one.

        Args:
            idx: Call index.
            session: Session for call.
            *args: Positional arguments for call.
            **kwargs: Keyword arguments for call.

        Note:
            Override.
        """
        pass

    def clean(
        self,
        sessions: List["Session"],
    ):
        """Clean/release all sessions.

        Args:
            sessions: List of sessions to clean.

        Note:
            Do not override.
        """
        try:
            tlogger = self.logger.enter()

            for idx, session in enumerate(sessions):
                session.kernel.machine_drop_session()
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def collect(
        self,
        sessions: List["Session"],
    ) -> List[Any]:
        """Collect all results.

        Results are collected using
        [Worker.collect_one][penvm.ext.workers.Worker.collect_one].

        Args:
            sessions: List of sessions to collect for.

        Returns:
            Results from calls.

        Note:
            Do not override.
        """
        try:
            tlogger = self.logger.enter()

            results = []
            if self.auto_collect:
                for idx, session in enumerate(sessions):
                    results.append(self.collect_one(idx, session))
            return results
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def collect_one(
        self,
        idx: int,
        session: "Session",
    ) -> Any:
        """Collect one result.

        Args:
            idx: Collect index.
            session: Session to collect for.

        Returns:
            Collected result.

        Note:
            Override.
        """
        pass

    def combine(
        self,
        results: List[Any],
    ) -> Any:
        """Combine all results into one and return it.

        Args:
            results: Collected results.

        Returns:
            Combined results.

        Note:
            Override.
        """
        pass

    def disable(self):
        """Disable worker.

        Will use fallback if available.
        """
        self.enabled = False

    def enable(self):
        """Enable worker.

        Will not force use of fallback.
        """
        self.enabled = True

    @property
    def machines(self) -> List["Machine"]:
        """Return the machines available for use, subject to the
        number of workers.

        Returns:
            List of machines available for use.
        """
        machines = self.network.get_machines()
        return machines[: self.nworkers]

    @property
    def nworkers(self) -> int:
        """Return the number of workers to use.

        Returns:
            Number of workers to use.
        """
        if self._nworkers != None:
            return self._nworkers
        else:
            return len(self.network.get_machines())

    def partition(
        self,
        *args: List,
        **kwargs: Dict,
    ):
        """Partition problem space. Yield for each.

        Args:
            *args: Positional arguments for call.
            **kwargs: Keyword arguments for call.

        Yields:
            (Tuple[List, Dict]): `args` and `kwargs` unchanged.

        Note:
            Override.
        """
        yield (args, kwargs)

    def process(
        self,
        results: List[Any],
    ) -> List[Any]:
        """Process and return all results.

        Args:
            results: Collected results.

        Returns:
            Results unchanged.

        Note:
            Override as needed.
        """
        return results

    def run(
        self,
        *args: List,
        **kwargs: Dict,
    ) -> Any:
        """Run work across network and return result.

        Args:
            *args: Positional arguments.
            **kwargs: Keyword arguments.

        Returns:
            Final result.

        Note:
            Do not override.
        """
        # ensure network is set up
        try:
            tlogger = self.logger.enter()

            try:
                self.network.boot()
            except Exception as e:
                raise WorkerException("cannot boot network")

            sessions = self.spawn(*args, **kwargs)
            results = self.collect(sessions)
            results = self.process(results)
            result = self.combine(results)
            self.clean(sessions)
            return result
        except WorkerException as e:
            raise
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def spawn(
        self,
        *args: List,
        **kwargs: Dict,
    ) -> List["Session"]:
        """Spawn partitioned work across machines.

        Args:
            *args: Positional arguments.
            **kwargs: Keyword arguments.

        Returns:
            List of sessions used.

        Note:
            Do not override.
        """
        try:
            tlogger = self.logger.enter()

            sessions = []
            machines = self.machines
            nmachines = len(machines)
            for idx, part in enumerate(self.partition(*args, **kwargs)):
                _args, _kwargs = part
                _args, _kwargs = self.transform(*_args, **_kwargs)
                machine = machines[idx % nmachines]
                session = machine.get_session()
                sessions.append(session)
                self.call_one(idx, session, *_args, **_kwargs)

            return sessions
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def test(
        self,
        *args: List,
        **kwargs: Dict,
    ):
        """Test run.

        No actual calls or processing are made.

        Args:
            *args: Positional arguments.
            **kwargs: Keyword arguments.

        Yields:
            (Tuple[List, Dict]): `args` and `kwargs` after partitioning and transformation.
        """
        try:
            tlogger = self.logger.enter()

            # see spawn for calling pattern as below
            for idx, part in enumerate(self.partition(*args, **kwargs)):
                _args, _kwargs = part
                _args, _kwargs = self.transform(*_args, **_kwargs)
                yield (_args, _kwargs)
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def transform(
        self,
        *args: List,
        **kwargs: Dict,
    ) -> Tuple[List, Dict]:
        """Transform args and kwargs after partitioning and prior to
        running.

        Args:
            *args: Positional arguments.
            **kwargs: Keyword arguments.

        Returns:
            `args` and `kwargs` unchanged.
        """
        return (args, kwargs)

machines: List[Machine] property

Return the machines available for use, subject to the number of workers.

Returns:

Type Description
List[Machine]

List of machines available for use.

nworkers: int property

Return the number of workers to use.

Returns:

Type Description
int

Number of workers to use.

__init__(network, nworkers=None, auto_collect=True)

Initialize.

Parameters:

Name Type Description Default
network Network

Network object.

required
nworkers Union[int, None]

(Maximum) Number of workers to run with.

None
auto_collect bool

Collect response objects.

True
Source code in penvm/src/ext/penvm/ext/workers/__init__.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    network: "Network",
    nworkers: Union[int, None] = None,
    auto_collect: bool = True,
):
    """Initialize.

    Args:
        network: Network object.
        nworkers: (Maximum) Number of workers to run with.
        auto_collect: Collect response objects."""
    try:
        super().__init__(None, logger)
        tlogger = self.logger.enter()

        self.network = network
        self._nworkers = nworkers
        self.enabled = True
        self.auto_collect = auto_collect
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

call_one(idx, session, *args, **kwargs)

Call one.

Parameters:

Name Type Description Default
idx int

Call index.

required
session Session

Session for call.

required
*args List

Positional arguments for call.

()
**kwargs Dict

Keyword arguments for call.

{}
Note

Override.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def call_one(
    self,
    idx: int,
    session: "Session",
    *args: List,
    **kwargs: Dict,
):
    """Call one.

    Args:
        idx: Call index.
        session: Session for call.
        *args: Positional arguments for call.
        **kwargs: Keyword arguments for call.

    Note:
        Override.
    """
    pass

clean(sessions)

Clean/release all sessions.

Parameters:

Name Type Description Default
sessions List[Session]

List of sessions to clean.

required
Note

Do not override.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def clean(
    self,
    sessions: List["Session"],
):
    """Clean/release all sessions.

    Args:
        sessions: List of sessions to clean.

    Note:
        Do not override.
    """
    try:
        tlogger = self.logger.enter()

        for idx, session in enumerate(sessions):
            session.kernel.machine_drop_session()
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

collect(sessions)

Collect all results.

Results are collected using Worker.collect_one.

Parameters:

Name Type Description Default
sessions List[Session]

List of sessions to collect for.

required

Returns:

Type Description
List[Any]

Results from calls.

Note

Do not override.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def collect(
    self,
    sessions: List["Session"],
) -> List[Any]:
    """Collect all results.

    Results are collected using
    [Worker.collect_one][penvm.ext.workers.Worker.collect_one].

    Args:
        sessions: List of sessions to collect for.

    Returns:
        Results from calls.

    Note:
        Do not override.
    """
    try:
        tlogger = self.logger.enter()

        results = []
        if self.auto_collect:
            for idx, session in enumerate(sessions):
                results.append(self.collect_one(idx, session))
        return results
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

collect_one(idx, session)

Collect one result.

Parameters:

Name Type Description Default
idx int

Collect index.

required
session Session

Session to collect for.

required

Returns:

Type Description
Any

Collected result.

Note

Override.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def collect_one(
    self,
    idx: int,
    session: "Session",
) -> Any:
    """Collect one result.

    Args:
        idx: Collect index.
        session: Session to collect for.

    Returns:
        Collected result.

    Note:
        Override.
    """
    pass

combine(results)

Combine all results into one and return it.

Parameters:

Name Type Description Default
results List[Any]

Collected results.

required

Returns:

Type Description
Any

Combined results.

Note

Override.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def combine(
    self,
    results: List[Any],
) -> Any:
    """Combine all results into one and return it.

    Args:
        results: Collected results.

    Returns:
        Combined results.

    Note:
        Override.
    """
    pass

disable()

Disable worker.

Will use fallback if available.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
185
186
187
188
189
190
def disable(self):
    """Disable worker.

    Will use fallback if available.
    """
    self.enabled = False

enable()

Enable worker.

Will not force use of fallback.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
192
193
194
195
196
197
def enable(self):
    """Enable worker.

    Will not force use of fallback.
    """
    self.enabled = True

partition(*args, **kwargs)

Partition problem space. Yield for each.

Parameters:

Name Type Description Default
*args List

Positional arguments for call.

()
**kwargs Dict

Keyword arguments for call.

{}

Yields:

Type Description
Tuple[List, Dict]

args and kwargs unchanged.

Note

Override.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def partition(
    self,
    *args: List,
    **kwargs: Dict,
):
    """Partition problem space. Yield for each.

    Args:
        *args: Positional arguments for call.
        **kwargs: Keyword arguments for call.

    Yields:
        (Tuple[List, Dict]): `args` and `kwargs` unchanged.

    Note:
        Override.
    """
    yield (args, kwargs)

process(results)

Process and return all results.

Parameters:

Name Type Description Default
results List[Any]

Collected results.

required

Returns:

Type Description
List[Any]

Results unchanged.

Note

Override as needed.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def process(
    self,
    results: List[Any],
) -> List[Any]:
    """Process and return all results.

    Args:
        results: Collected results.

    Returns:
        Results unchanged.

    Note:
        Override as needed.
    """
    return results

run(*args, **kwargs)

Run work across network and return result.

Parameters:

Name Type Description Default
*args List

Positional arguments.

()
**kwargs Dict

Keyword arguments.

{}

Returns:

Type Description
Any

Final result.

Note

Do not override.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def run(
    self,
    *args: List,
    **kwargs: Dict,
) -> Any:
    """Run work across network and return result.

    Args:
        *args: Positional arguments.
        **kwargs: Keyword arguments.

    Returns:
        Final result.

    Note:
        Do not override.
    """
    # ensure network is set up
    try:
        tlogger = self.logger.enter()

        try:
            self.network.boot()
        except Exception as e:
            raise WorkerException("cannot boot network")

        sessions = self.spawn(*args, **kwargs)
        results = self.collect(sessions)
        results = self.process(results)
        result = self.combine(results)
        self.clean(sessions)
        return result
    except WorkerException as e:
        raise
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

spawn(*args, **kwargs)

Spawn partitioned work across machines.

Parameters:

Name Type Description Default
*args List

Positional arguments.

()
**kwargs Dict

Keyword arguments.

{}

Returns:

Type Description
List[Session]

List of sessions used.

Note

Do not override.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def spawn(
    self,
    *args: List,
    **kwargs: Dict,
) -> List["Session"]:
    """Spawn partitioned work across machines.

    Args:
        *args: Positional arguments.
        **kwargs: Keyword arguments.

    Returns:
        List of sessions used.

    Note:
        Do not override.
    """
    try:
        tlogger = self.logger.enter()

        sessions = []
        machines = self.machines
        nmachines = len(machines)
        for idx, part in enumerate(self.partition(*args, **kwargs)):
            _args, _kwargs = part
            _args, _kwargs = self.transform(*_args, **_kwargs)
            machine = machines[idx % nmachines]
            session = machine.get_session()
            sessions.append(session)
            self.call_one(idx, session, *_args, **_kwargs)

        return sessions
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

test(*args, **kwargs)

Test run.

No actual calls or processing are made.

Parameters:

Name Type Description Default
*args List

Positional arguments.

()
**kwargs Dict

Keyword arguments.

{}

Yields:

Type Description
Tuple[List, Dict]

args and kwargs after partitioning and transformation.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def test(
    self,
    *args: List,
    **kwargs: Dict,
):
    """Test run.

    No actual calls or processing are made.

    Args:
        *args: Positional arguments.
        **kwargs: Keyword arguments.

    Yields:
        (Tuple[List, Dict]): `args` and `kwargs` after partitioning and transformation.
    """
    try:
        tlogger = self.logger.enter()

        # see spawn for calling pattern as below
        for idx, part in enumerate(self.partition(*args, **kwargs)):
            _args, _kwargs = part
            _args, _kwargs = self.transform(*_args, **_kwargs)
            yield (_args, _kwargs)
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

transform(*args, **kwargs)

Transform args and kwargs after partitioning and prior to running.

Parameters:

Name Type Description Default
*args List

Positional arguments.

()
**kwargs Dict

Keyword arguments.

{}

Returns:

Type Description
Tuple[List, Dict]

args and kwargs unchanged.

Source code in penvm/src/ext/penvm/ext/workers/__init__.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def transform(
    self,
    *args: List,
    **kwargs: Dict,
) -> Tuple[List, Dict]:
    """Transform args and kwargs after partitioning and prior to
    running.

    Args:
        *args: Positional arguments.
        **kwargs: Keyword arguments.

    Returns:
        `args` and `kwargs` unchanged.
    """
    return (args, kwargs)
You are on penvm.dev