Skip to content

Modules

Top-level package for Action Rules.

ActionRules

A class used to generate action rules for a given dataset.

Attributes:

Name Type Description
min_stable_attributes int

The minimum number of stable attributes required.

min_flexible_attributes int

The minimum number of flexible attributes required.

min_undesired_support int

The minimum support for the undesired state.

min_undesired_confidence float

The minimum confidence for the undesired state.

min_desired_support int

The minimum support for the desired state.

min_desired_confidence float

The minimum confidence for the desired state.

verbose (bool, optional)

If True, enables verbose output.

rules (Optional[Rules], optional)

Stores the generated rules.

output (Optional[Output], optional)

Stores the generated action rules.

np (Optional[ModuleType], optional)

The numpy or cupy module used for array operations.

pd (Optional[ModuleType], optional)

The pandas or cudf module used for DataFrame operations.

is_gpu_np bool

Indicates whether GPU-accelerated numpy (cupy) is used.

is_gpu_pd bool

Indicates whether GPU-accelerated pandas (cudf) is used.

intrinsic_utility_table (dict, optional)

(attribute, value) -> float A lookup table for the intrinsic utility of each attribute-value pair. If None, no intrinsic utility is considered.

transition_utility_table (dict, optional)

(attribute, from_value, to_value) -> float A lookup table for cost/gain of transitions between values. If None, no transition utility is considered.

Methods:

Name Description
fit
use_gpu=False)

Generates action rules based on the provided dataset and parameters.

get_bindings

Binds attributes to corresponding columns in the dataset.

get_stop_list

Generates a stop list to prevent certain combinations of attributes.

get_split_tables

Splits the dataset into tables based on target item bindings.

get_rules

Returns the generated action rules if available.

predict

Predicts recommended actions based on the provided row of data.

Source code in src/action_rules/action_rules.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
class ActionRules:
    """
    A class used to generate action rules for a given dataset.

    Attributes
    ----------
    min_stable_attributes : int
        The minimum number of stable attributes required.
    min_flexible_attributes : int
        The minimum number of flexible attributes required.
    min_undesired_support : int
        The minimum support for the undesired state.
    min_undesired_confidence : float
        The minimum confidence for the undesired state.
    min_desired_support : int
        The minimum support for the desired state.
    min_desired_confidence : float
        The minimum confidence for the desired state.
    verbose : bool, optional
        If True, enables verbose output.
    rules : Optional[Rules], optional
        Stores the generated rules.
    output : Optional[Output], optional
        Stores the generated action rules.
    np : Optional[ModuleType], optional
        The numpy or cupy module used for array operations.
    pd : Optional[ModuleType], optional
        The pandas or cudf module used for DataFrame operations.
    is_gpu_np : bool
        Indicates whether GPU-accelerated numpy (cupy) is used.
    is_gpu_pd : bool
        Indicates whether GPU-accelerated pandas (cudf) is used.
    intrinsic_utility_table : dict, optional
        (attribute, value) -> float
        A lookup table for the intrinsic utility of each attribute-value pair.
        If None, no intrinsic utility is considered.
    transition_utility_table : dict, optional
        (attribute, from_value, to_value) -> float
        A lookup table for cost/gain of transitions between values.
        If None, no transition utility is considered.

    Methods
    -------
    fit(data, stable_attributes, flexible_attributes, target, undesired_state, desired_state, use_sparse_matrix=False,
    use_gpu=False)
        Generates action rules based on the provided dataset and parameters.
    get_bindings(data, stable_attributes, flexible_attributes, target)
        Binds attributes to corresponding columns in the dataset.
    get_stop_list(stable_items_binding, flexible_items_binding)
        Generates a stop list to prevent certain combinations of attributes.
    get_split_tables(data, target_items_binding, target)
        Splits the dataset into tables based on target item bindings.
    get_rules()
        Returns the generated action rules if available.
    predict(frame_row)
        Predicts recommended actions based on the provided row of data.
    """

    def __init__(
        self,
        min_stable_attributes: int,
        min_flexible_attributes: int,
        min_undesired_support: int,
        min_undesired_confidence: float,
        min_desired_support: int,
        min_desired_confidence: float,
        verbose=False,
        intrinsic_utility_table: Optional[dict] = None,
        transition_utility_table: Optional[dict] = None,
    ):
        """
        Initialize the ActionRules class with the specified parameters.

        Parameters
        ----------
        min_stable_attributes : int
            The minimum number of stable attributes required.
        min_flexible_attributes : int
            The minimum number of flexible attributes required.
        min_undesired_support : int
            The minimum support for the undesired state.
        min_undesired_confidence : float
            The minimum confidence for the undesired state.
        min_desired_support : int
            The minimum support for the desired state.
        min_desired_confidence : float
            The minimum confidence for the desired state.
        verbose : bool, optional
            If True, enables verbose output. Default is False.
        intrinsic_utility_table : dict, optional
            (attribute, value) -> float
            A lookup table for the intrinsic utility of each attribute-value pair.
            If None, no intrinsic utility is considered.
        transition_utility_table : dict, optional
            (attribute, from_value, to_value) -> float
            A lookup table for cost/gain of transitions between values.
            If None, no transition utility is considered.

        Notes
        -----
        The `verbose` parameter can be used to enable detailed output during the rule generation process.
        """
        self.min_stable_attributes = min_stable_attributes
        self.min_flexible_attributes = min_flexible_attributes
        self.min_undesired_support = min_undesired_support
        self.min_desired_support = min_desired_support
        self.min_undesired_confidence = min_undesired_confidence
        self.min_desired_confidence = min_desired_confidence
        self.verbose = verbose
        self.rules = None  # type: Optional[Rules]
        self.output = None  # type: Optional[Output]
        self.np = None  # type: Optional[ModuleType]
        self.pd = None  # type: Optional[ModuleType]
        self.is_gpu_np = False
        self.is_gpu_pd = False
        self.is_onehot = False
        self.intrinsic_utility_table = intrinsic_utility_table or {}
        self.transition_utility_table = transition_utility_table or {}

    def count_max_nodes(self, stable_items_binding: dict, flexible_items_binding: dict) -> int:
        """
        Calculate the maximum number of nodes based on the given item bindings.

        This function takes two dictionaries, `stable_items_binding` and `flexible_items_binding`,
        which map attributes to lists of items. It calculates the total number of nodes by considering
        all possible combinations of the lengths of these item lists and summing the product of each combination.

        Parameters
        ----------
        stable_items_binding : dict
            A dictionary where keys are attributes and values are lists of stable items.
        flexible_items_binding : dict
            A dictionary where keys are attributes and values are lists of flexible items.

        Returns
        -------
        int
            The total number of nodes calculated by summing the product of lengths of all combinations of item lists.

        Notes
        -----
        - The function first combines the lengths of item lists from both dictionaries.
        - It then calculates the sum of the products of all possible combinations of these lengths.
        """
        import numpy

        values_in_attribute = []
        for items in list(stable_items_binding.values()) + list(flexible_items_binding.values()):
            values_in_attribute.append(len(items))

        sum_nodes = 0
        for i in range(len(values_in_attribute)):
            for comb in itertools.combinations(values_in_attribute, i + 1):
                sum_nodes += int(numpy.prod(comb))
        return sum_nodes

    def set_array_library(self, use_gpu: bool, df: Union['cudf.DataFrame', 'pandas.DataFrame']):
        """
        Set the appropriate array and DataFrame libraries (cuDF or pandas) based on the user's preference.

        Parameters
        ----------
        use_gpu : bool
            Indicates whether to use GPU (cuDF) for data processing if available.
        df : Union[cudf.DataFrame, pandas.DataFrame]
            The DataFrame to convert.

        Raises
        ------
        ImportError
            If `use_gpu` is True but cuDF is not available and pandas cannot be imported as fallback.

        Warnings
        --------
        UserWarning
            If `use_gpu` is True but cuDF is not available, a warning is issued indicating fallback to pandas.

        Notes
        -----
        This method determines whether to use GPU-accelerated libraries for processing data, falling back to CPU-based
        libraries if necessary.
        """
        if use_gpu:
            try:
                import cupy as np

                is_gpu_np = True
            except ImportError:
                warnings.warn("CuPy is not available. Falling back to Numpy.")
                import numpy as np

                is_gpu_np = False
        else:
            import numpy as np

            is_gpu_np = False

        df_library_imported = False
        try:
            import pandas as pd

            if isinstance(df, pd.DataFrame):
                is_gpu_pd = False
                df_library_imported = True
        except ImportError:
            df_library_imported = False

        if not df_library_imported:
            try:
                import cudf as pd

                if isinstance(df, pd.DataFrame):
                    is_gpu_pd = True
                    df_library_imported = True
            except ImportError:
                df_library_imported = False

        if not df_library_imported:
            raise ImportError('Just Pandas or cuDF dataframes are supported.')

        self.np = np
        self.pd = pd
        self.is_gpu_np = is_gpu_np
        self.is_gpu_pd = is_gpu_pd

    def df_to_array(self, df: Union['cudf.DataFrame', 'pandas.DataFrame'], use_sparse_matrix: bool = False) -> tuple:
        """
        Convert a DataFrame to a numpy or CuPy array.

        Parameters
        ----------
        df : Union[cudf.DataFrame, pandas.DataFrame]
            The DataFrame to convert.
        use_sparse_matrix : bool, optional
            If True, a sparse matrix is used. Default is False.

        Returns
        -------
        tuple
            A tuple containing the transposed array and the DataFrame columns.

        Notes
        -----
        The data is converted to an unsigned 8-bit integer array (`np.uint8`). If `use_gpu` is True,
        the array is further converted to a CuPy array.
        """
        columns = list(df.columns)
        # cuDF and CuPy
        if self.is_gpu_np and self.is_gpu_pd:
            if use_sparse_matrix:
                from cupyx.scipy.sparse import csr_matrix

                data = csr_matrix(df.values, dtype=self.np.float32).T  # type: ignore
            else:
                data = self.np.asarray(df.values, dtype=self.np.uint8).T  # type: ignore
        # Pandas and CuPy
        elif self.is_gpu_np and not self.is_gpu_pd:
            if use_sparse_matrix:
                from cupyx.scipy.sparse import csr_matrix
                from scipy.sparse import csr_matrix as scipy_csr_matrix

                scipy_matrix = scipy_csr_matrix(df.values).T
                data = csr_matrix(scipy_matrix, dtype=float)
            else:
                data = self.np.asarray(df.values, dtype=self.np.uint8).T  # type: ignore
        # cuDF and Numpy
        elif not self.is_gpu_np and self.is_gpu_pd:
            if use_sparse_matrix:
                from scipy.sparse import csr_matrix

                data = csr_matrix(df.to_numpy(), dtype=self.np.uint8).T  # type: ignore
            else:
                data = df.to_numpy().T  # type: ignore
        # Pandas and Numpy
        else:
            if use_sparse_matrix:
                from scipy.sparse import csr_matrix

                data = csr_matrix(df.values, dtype=self.np.uint8).T  # type: ignore
            else:
                data = df.to_numpy(dtype=self.np.uint8).T  # type: ignore
        return data, columns

    def one_hot_encode(
        self,
        data: Union['cudf.DataFrame', 'pandas.DataFrame'],
        stable_attributes: list,
        flexible_attributes: list,
        target: str,
    ) -> Union['cudf.DataFrame', 'pandas.DataFrame']:
        """
        Perform one-hot encoding on the specified stable, flexible, and target attributes of the DataFrame.

        Parameters
        ----------
        data : Union[cudf.DataFrame, pandas.DataFrame]
            The input DataFrame containing the data to be encoded.
        stable_attributes : list
            List of stable attributes to be one-hot encoded.
        flexible_attributes : list
            List of flexible attributes to be one-hot encoded.
        target : str
            The target attribute to be one-hot encoded.

        Returns
        -------
        Union[cudf.DataFrame, pandas.DataFrame]
            A DataFrame with the specified attributes one-hot encoded.

        Notes
        -----
        The input data is first converted to string type to ensure consistent encoding. The stable attributes,
        flexible attributes, and target attribute are then one-hot encoded separately and concatenated into a
        single DataFrame.
        """
        data = data.astype(str)
        to_concat = []
        if len(stable_attributes) > 0:
            data_stable = self.pd.get_dummies(  # type: ignore
                data[stable_attributes], sparse=False, prefix_sep='_<item_stable>_'
            )
            to_concat.append(data_stable)
        if len(flexible_attributes) > 0:
            data_flexible = self.pd.get_dummies(  # type: ignore
                data[flexible_attributes], sparse=False, prefix_sep='_<item_flexible>_'
            )
            to_concat.append(data_flexible)
        data_target = self.pd.get_dummies(data[[target]], sparse=False, prefix_sep='_<item_target>_')  # type: ignore
        to_concat.append(data_target)
        data = self.pd.concat(to_concat, axis=1)  # type: ignore
        return data

    def fit_onehot(
        self,
        data: Union['cudf.DataFrame', 'pandas.DataFrame'],
        stable_attributes: dict,
        flexible_attributes: dict,
        target: dict,
        target_undesired_state: str,
        target_desired_state: str,
        use_sparse_matrix: bool = False,
        use_gpu: bool = False,
    ):
        """
        Preprocess and fit the model using one-hot encoded attributes.

        This method prepares the dataset for generating action rules by
        performing one-hot encoding on the specified stable, flexible,
        and target attributes. The resulting dataset is then used to fit
        the model using the `fit` method.

        Parameters
        ----------
        data : Union[cudf.DataFrame, pandas.DataFrame]
            The dataset to be processed and used for fitting the model.
        stable_attributes : dict
            A dictionary mapping stable attribute names to lists of column
            names corresponding to those attributes.
        flexible_attributes : dict
            A dictionary mapping flexible attribute names to lists of column
            names corresponding to those attributes.
        target : dict
            A dictionary mapping the target attribute name to a list of
            column names corresponding to that attribute.
        target_undesired_state : str
            The undesired state of the target attribute, used in action rule generation.
        target_desired_state : str
            The desired state of the target attribute, used in action rule generation.
        use_sparse_matrix : bool, optional
            If True, a sparse matrix is used in the fitting process. Default is False.
        use_gpu : bool, optional
            If True, the GPU (cuDF) is used for data processing if available.
            Default is False.

        Notes
        -----
        The method modifies the dataset by:
        1. Renaming columns according to the stable, flexible, and target attributes.
        2. Removing columns that are not associated with any of these attributes.
        3. Passing the processed dataset and relevant attribute lists to the `fit` method
           to generate action rules.

        This method ensures that the dataset is correctly preprocessed for rule
        generation, focusing on the specified attributes and their one-hot encoded forms.
        """
        self.is_onehot = True
        data = data.copy()
        data = data.astype('bool')
        new_labels = []
        attributes_stable = set([])
        attribtes_flexible = set([])
        attribute_target = ''
        remove_cols = []
        for label in data.columns:
            to_remove = True
            for attribute, columns in stable_attributes.items():
                if label in columns:
                    new_labels.append(attribute + '_<item_stable>_' + label)
                    attributes_stable.add(attribute)
                    to_remove = False
            for attribute, columns in flexible_attributes.items():
                if label in columns:
                    new_labels.append(attribute + '_<item_flexible>_' + label)
                    attribtes_flexible.add(attribute)
                    to_remove = False
            for attribute, columns in target.items():
                if label in columns:
                    new_labels.append(attribute + '_<item_target>_' + label)
                    attribute_target = attribute
                    to_remove = False
            if to_remove:
                new_labels.append(label)
                remove_cols.append(label)
        data.columns = new_labels
        data = data.drop(columns=remove_cols)
        self.fit(
            data,
            list(attributes_stable),
            list(attribtes_flexible),
            attribute_target,
            target_undesired_state,
            target_desired_state,
            use_sparse_matrix,
            use_gpu,
        )

    def fit(
        self,
        data: Union['cudf.DataFrame', 'pandas.DataFrame'],
        stable_attributes: list,
        flexible_attributes: list,
        target: str,
        target_undesired_state: str,
        target_desired_state: str,
        use_sparse_matrix: bool = False,
        use_gpu: bool = False,
    ):
        """
        Generate action rules based on the provided dataset and parameters.

        Parameters
        ----------
        data : Union[cudf.DataFrame, pandas.DataFrame]
            The dataset to generate action rules from.
        stable_attributes : list
            List of stable attributes.
        flexible_attributes : list
            List of flexible attributes.
        target : str
            The target attribute.
        target_undesired_state : str
            The undesired state of the target attribute.
        target_desired_state : str
            The desired state of the target attribute.
        use_sparse_matrix : bool, optional
            If True, a sparse matrix is used. Default is False.
        use_gpu : bool, optional
            Use GPU (cuDF) for data processing if available. Default is False.

        Raises
        ------
        RuntimeError
            If the model has already been fitted.

        Notes
        -----
        This method performs one-hot encoding on the specified attributes, converts the DataFrame to an array,
        and generates action rules by iterating over candidate rules and pruning them based on the given parameters.
        """
        if self.output is not None:
            raise RuntimeError("The model is already fit.")
        self.set_array_library(use_gpu, data)
        if not self.is_onehot:
            data = self.one_hot_encode(data, stable_attributes, flexible_attributes, target)
        data, columns = self.df_to_array(data, use_sparse_matrix)

        stable_items_binding, flexible_items_binding, target_items_binding, column_values = self.get_bindings(
            columns, stable_attributes, flexible_attributes, target
        )

        self.intrinsic_utility_table, self.transition_utility_table = self.remap_utility_tables(column_values)

        if self.verbose:
            print('Maximum number of nodes to check for support:')
            print('_____________________________________________')
            print(self.count_max_nodes(stable_items_binding, flexible_items_binding))
            print('')
        stop_list = self.get_stop_list(stable_items_binding, flexible_items_binding)
        frames = self.get_split_tables(data, target_items_binding, target, use_sparse_matrix)
        undesired_state = columns.index(target + '_<item_target>_' + str(target_undesired_state))
        desired_state = columns.index(target + '_<item_target>_' + str(target_desired_state))

        stop_list_itemset = []  # type: list

        candidates_queue = [
            {
                'ar_prefix': tuple(),
                'itemset_prefix': tuple(),
                'stable_items_binding': stable_items_binding,
                'flexible_items_binding': flexible_items_binding,
                'undesired_mask': None,
                'desired_mask': None,
                'actionable_attributes': 0,
            }
        ]
        k = 0
        self.rules = Rules(
            undesired_state,
            desired_state,
            columns,
            data.shape[1],
            self.intrinsic_utility_table,
            self.transition_utility_table,
        )
        candidate_generator = CandidateGenerator(
            frames,
            self.min_stable_attributes,
            self.min_flexible_attributes,
            self.min_undesired_support,
            self.min_desired_support,
            self.min_undesired_confidence,
            self.min_desired_confidence,
            undesired_state,
            desired_state,
            self.rules,
            use_sparse_matrix,
        )
        while len(candidates_queue) > 0:
            candidate = candidates_queue.pop(0)
            if len(candidate['ar_prefix']) > k:
                k += 1
                self.rules.prune_classification_rules(k, stop_list)
            new_candidates = candidate_generator.generate_candidates(
                **candidate,
                stop_list=stop_list,
                stop_list_itemset=stop_list_itemset,
                undesired_state=undesired_state,
                desired_state=desired_state,
                verbose=self.verbose,
            )
            candidates_queue += new_candidates
        self.rules.generate_action_rules()
        self.output = Output(
            self.rules.action_rules, target, stable_items_binding, flexible_items_binding, column_values
        )
        del data
        if self.is_gpu_np:
            self.np.get_default_memory_pool().free_all_blocks()  # type: ignore

    def get_bindings(
        self,
        columns: list,
        stable_attributes: list,
        flexible_attributes: list,
        target: str,
    ) -> tuple:
        """
        Bind attributes to corresponding columns in the dataset.

        Parameters
        ----------
        columns : list
            List of column names in the dataset.
        stable_attributes : list
            List of stable attributes.
        flexible_attributes : list
            List of flexible attributes.
        target : str
            The target attribute.

        Returns
        -------
        tuple
            A tuple containing the bindings for stable attributes, flexible attributes, and target items.

        Notes
        -----
        The method generates mappings from column indices to attribute values for stable, flexible, and target
        attributes.
        """
        stable_items_binding = defaultdict(lambda: [])
        flexible_items_binding = defaultdict(lambda: [])
        target_items_binding = defaultdict(lambda: [])
        column_values = {}

        for i, col in enumerate(columns):
            is_continue = False
            # stable
            for attribute in stable_attributes:
                if col.startswith(attribute + '_<item_stable>_'):
                    stable_items_binding[attribute].append(i)
                    column_values[i] = (attribute, col.split('_<item_stable>_', 1)[1])
                    is_continue = True
                    break
            if is_continue is True:
                continue
            # flexible
            for attribute in flexible_attributes:
                if col.startswith(attribute + '_<item_flexible>_'):
                    flexible_items_binding[attribute].append(i)
                    column_values[i] = (attribute, col.split('_<item_flexible>_', 1)[1])
                    is_continue = True
                    break
            if is_continue is True:
                continue
            # target
            if col.startswith(target + '_<item_target>_'):
                target_items_binding[target].append(i)
                column_values[i] = (target, col.split('_<item_target>_', 1)[1])
        return stable_items_binding, flexible_items_binding, target_items_binding, column_values

    def get_stop_list(self, stable_items_binding: dict, flexible_items_binding: dict) -> list:
        """
        Generate a stop list to prevent certain combinations of attributes.

        Parameters
        ----------
        stable_items_binding : dict
            Dictionary containing bindings for stable items.
        flexible_items_binding : dict
            Dictionary containing bindings for flexible items.

        Returns
        -------
        list
            A list of stop combinations.

        Notes
        -----
        The stop list is generated by creating pairs of stable item indices and ensuring flexible items do not repeat.
        """
        stop_list = []
        for items in stable_items_binding.values():
            for stop_couple in itertools.product(items, repeat=2):
                stop_list.append(tuple(stop_couple))
        for item in flexible_items_binding.keys():
            stop_list.append(tuple([item, item]))
        return stop_list

    def get_split_tables(
        self,
        data: Union['numpy.ndarray', 'cupy.ndarray', 'cupyx.scipy.sparse.csr_matrix', 'scipy.sparse.csr_matrix'],
        target_items_binding: dict,
        target: str,
        use_sparse_matrix: bool = False,
    ) -> dict:
        """
        Split the dataset into tables based on target item bindings.

        Parameters
        ----------
        data : Union['numpy.ndarray', 'cupy.ndarray', 'cupyx.scipy.sparse.csr_matrix', 'scipy.sparse.csr_matrix']
            The dataset to be split.
        target_items_binding : dict
            Dictionary containing bindings for target items.
        target : str
            The target attribute.
        use_sparse_matrix : bool, optional
            If True, a sparse matrix is used. Default is False.

        Returns
        -------
        dict
            A dictionary containing the split tables.

        Notes
        -----
        The method creates masks for the target items and splits the data accordingly.
        """
        frames = {}
        for item in target_items_binding[target]:
            mask = data[item] == 1
            if use_sparse_matrix:
                frames[item] = data.multiply(mask)  # type: ignore
            else:
                frames[item] = data[:, mask]
        return frames

    def get_rules(self) -> Output:
        """
        Return the generated action rules if available.

        Raises
        ------
        RuntimeError
            If the model has not been fitted.

        Returns
        -------
        Output
            The generated action rules.

        Notes
        -----
        This method returns the `Output` object containing the generated action rules.
        """
        if self.output is None:
            raise RuntimeError("The model is not fit.")
        return self.output

    def predict(self, frame_row: Union['cudf.Series', 'pandas.Series']) -> Union['cudf.DataFrame', 'pandas.DataFrame']:
        """
        Predict recommended actions based on the provided row of data.

        This method applies the fitted action rules to the given row of data and generates
        a DataFrame with recommended actions if any of the action rules are triggered.

        Parameters
        ----------
        frame_row : Union['cudf.Series', 'pandas.Series']
            A row of data in the form of a cuDF or pandas Series. The Series should
            contain the features required by the action rules.

        Returns
        -------
        Union['cudf.DataFrame', 'pandas.DataFrame']
            A DataFrame with the recommended actions. The DataFrame includes the following columns:
            - The original attributes with recommended changes.
            - 'ActionRules_RuleIndex': Index of the action rule applied.
            - 'ActionRules_UndesiredSupport': Support of the undesired part of the rule.
            - 'ActionRules_DesiredSupport': Support of the desired part of the rule.
            - 'ActionRules_UndesiredConfidence': Confidence of the undesired part of the rule.
            - 'ActionRules_DesiredConfidence': Confidence of the desired part of the rule.
            - 'ActionRules_Uplift': Uplift value of the rule.

        Raises
        ------
        RuntimeError
            If the model has not been fitted.

        Notes
        -----
        The method compares the given row of data against the undesired itemsets of the action rules.
        If a match is found, it applies the desired itemset changes and records the action rule's
        metadata. The result is a DataFrame with one or more rows representing the recommended actions
        for the given data.
        """
        if self.output is None:
            raise RuntimeError("The model is not fit.")
        index_value_tuples = list(zip(frame_row.index, frame_row))
        values = []
        column_values = self.output.column_values
        for index_value_tuple in index_value_tuples:
            values.append(list(column_values.keys())[list(column_values.values()).index(index_value_tuple)])
        new_values = tuple(values)
        predicted = []
        for i, action_rule in enumerate(self.output.action_rules):
            if set(action_rule['undesired']['itemset']) <= set(new_values):
                predicted_row = frame_row.copy()
                for recommended in set(action_rule['desired']['itemset']) - set(new_values):
                    attribute, value = column_values[recommended]
                    predicted_row[attribute + ' (Recommended)'] = value
                predicted_row['ActionRules_RuleIndex'] = i
                predicted_row['ActionRules_UndesiredSupport'] = action_rule['undesired']['support']
                predicted_row['ActionRules_DesiredSupport'] = action_rule['desired']['support']
                predicted_row['ActionRules_UndesiredConfidence'] = action_rule['undesired']['confidence']
                predicted_row['ActionRules_DesiredConfidence'] = action_rule['desired']['confidence']
                predicted_row['ActionRules_Uplift'] = action_rule['uplift']
                predicted.append(predicted_row)
        return self.pd.DataFrame(predicted)  # type: ignore

    def remap_utility_tables(self, column_values):
        """
        Remap the keys of intrinsic and transition utility tables using the provided column mapping.

        The function uses `column_values`, a dictionary mapping internal column indices to
        (attribute, value) tuples, to invert the mapping so that utility table keys are replaced
        with the corresponding integer index (for intrinsic utilities) or a tuple of integer indices
        (for transition utilities).

        Parameters
        ----------
        column_values : dict
            Dictionary mapping integer column indices to (attribute, value) pairs.
            Example: {0: ('Age', 'O'), 1: ('Age', 'Y'), 2: ('Sex', 'F'), ...}

        Returns
        -------
        tuple
            A tuple (remapped_intrinsic, remapped_transition) where:
              - remapped_intrinsic is a dict mapping integer column index to utility value.
              - remapped_transition is a dict mapping (from_index, to_index) to utility value.

        Notes
        -----
        - The method performs case-insensitive matching by converting attribute names and values to lowercase.
        - If a key in a utility table does not have a corresponding entry in column_values, it is skipped.
        """
        # Invert column_values to map (attribute.lower(), value.lower()) -> column index.
        inv_map = {(attr.lower(), val.lower()): idx for idx, (attr, val) in column_values.items()}

        remapped_intrinsic = {}
        # Remap intrinsic utility table keys: ('Attribute', 'Value') -> utility
        for key, utility in self.intrinsic_utility_table.items():
            # Normalize key to lowercase
            attr, val = key
            lookup_key = (attr.lower(), val.lower())
            # Look up the corresponding column index; if not found, skip this key.
            if lookup_key in inv_map:
                col_index = inv_map[lookup_key]
                remapped_intrinsic[col_index] = utility
            # Else: optionally, one could log or warn about a missing mapping.

        remapped_transition = {}
        # Remap transition utility table keys: ('Attribute', from_value, to_value) -> utility
        for key, utility in self.transition_utility_table.items():
            attr, from_val, to_val = key
            lookup_from = (attr.lower(), from_val.lower())
            lookup_to = (attr.lower(), to_val.lower())
            # Only remap if both the from and to values exist in inv_map.
            if lookup_from in inv_map and lookup_to in inv_map:
                from_index = inv_map[lookup_from]
                to_index = inv_map[lookup_to]
                remapped_transition[(from_index, to_index)] = utility
            # Else: skip or log missing mapping.

        return remapped_intrinsic, remapped_transition

__init__(min_stable_attributes, min_flexible_attributes, min_undesired_support, min_undesired_confidence, min_desired_support, min_desired_confidence, verbose=False, intrinsic_utility_table=None, transition_utility_table=None)

Initialize the ActionRules class with the specified parameters.

Parameters:

Name Type Description Default
min_stable_attributes int

The minimum number of stable attributes required.

required
min_flexible_attributes int

The minimum number of flexible attributes required.

required
min_undesired_support int

The minimum support for the undesired state.

required
min_undesired_confidence float

The minimum confidence for the undesired state.

required
min_desired_support int

The minimum support for the desired state.

required
min_desired_confidence float

The minimum confidence for the desired state.

required
verbose bool

If True, enables verbose output. Default is False.

False
intrinsic_utility_table dict

(attribute, value) -> float A lookup table for the intrinsic utility of each attribute-value pair. If None, no intrinsic utility is considered.

None
transition_utility_table dict

(attribute, from_value, to_value) -> float A lookup table for cost/gain of transitions between values. If None, no transition utility is considered.

None
Notes

The verbose parameter can be used to enable detailed output during the rule generation process.

Source code in src/action_rules/action_rules.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def __init__(
    self,
    min_stable_attributes: int,
    min_flexible_attributes: int,
    min_undesired_support: int,
    min_undesired_confidence: float,
    min_desired_support: int,
    min_desired_confidence: float,
    verbose=False,
    intrinsic_utility_table: Optional[dict] = None,
    transition_utility_table: Optional[dict] = None,
):
    """
    Initialize the ActionRules class with the specified parameters.

    Parameters
    ----------
    min_stable_attributes : int
        The minimum number of stable attributes required.
    min_flexible_attributes : int
        The minimum number of flexible attributes required.
    min_undesired_support : int
        The minimum support for the undesired state.
    min_undesired_confidence : float
        The minimum confidence for the undesired state.
    min_desired_support : int
        The minimum support for the desired state.
    min_desired_confidence : float
        The minimum confidence for the desired state.
    verbose : bool, optional
        If True, enables verbose output. Default is False.
    intrinsic_utility_table : dict, optional
        (attribute, value) -> float
        A lookup table for the intrinsic utility of each attribute-value pair.
        If None, no intrinsic utility is considered.
    transition_utility_table : dict, optional
        (attribute, from_value, to_value) -> float
        A lookup table for cost/gain of transitions between values.
        If None, no transition utility is considered.

    Notes
    -----
    The `verbose` parameter can be used to enable detailed output during the rule generation process.
    """
    self.min_stable_attributes = min_stable_attributes
    self.min_flexible_attributes = min_flexible_attributes
    self.min_undesired_support = min_undesired_support
    self.min_desired_support = min_desired_support
    self.min_undesired_confidence = min_undesired_confidence
    self.min_desired_confidence = min_desired_confidence
    self.verbose = verbose
    self.rules = None  # type: Optional[Rules]
    self.output = None  # type: Optional[Output]
    self.np = None  # type: Optional[ModuleType]
    self.pd = None  # type: Optional[ModuleType]
    self.is_gpu_np = False
    self.is_gpu_pd = False
    self.is_onehot = False
    self.intrinsic_utility_table = intrinsic_utility_table or {}
    self.transition_utility_table = transition_utility_table or {}

count_max_nodes(stable_items_binding, flexible_items_binding)

Calculate the maximum number of nodes based on the given item bindings.

This function takes two dictionaries, stable_items_binding and flexible_items_binding, which map attributes to lists of items. It calculates the total number of nodes by considering all possible combinations of the lengths of these item lists and summing the product of each combination.

Parameters:

Name Type Description Default
stable_items_binding dict

A dictionary where keys are attributes and values are lists of stable items.

required
flexible_items_binding dict

A dictionary where keys are attributes and values are lists of flexible items.

required

Returns:

Type Description
int

The total number of nodes calculated by summing the product of lengths of all combinations of item lists.

Notes
  • The function first combines the lengths of item lists from both dictionaries.
  • It then calculates the sum of the products of all possible combinations of these lengths.
Source code in src/action_rules/action_rules.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def count_max_nodes(self, stable_items_binding: dict, flexible_items_binding: dict) -> int:
    """
    Calculate the maximum number of nodes based on the given item bindings.

    This function takes two dictionaries, `stable_items_binding` and `flexible_items_binding`,
    which map attributes to lists of items. It calculates the total number of nodes by considering
    all possible combinations of the lengths of these item lists and summing the product of each combination.

    Parameters
    ----------
    stable_items_binding : dict
        A dictionary where keys are attributes and values are lists of stable items.
    flexible_items_binding : dict
        A dictionary where keys are attributes and values are lists of flexible items.

    Returns
    -------
    int
        The total number of nodes calculated by summing the product of lengths of all combinations of item lists.

    Notes
    -----
    - The function first combines the lengths of item lists from both dictionaries.
    - It then calculates the sum of the products of all possible combinations of these lengths.
    """
    import numpy

    values_in_attribute = []
    for items in list(stable_items_binding.values()) + list(flexible_items_binding.values()):
        values_in_attribute.append(len(items))

    sum_nodes = 0
    for i in range(len(values_in_attribute)):
        for comb in itertools.combinations(values_in_attribute, i + 1):
            sum_nodes += int(numpy.prod(comb))
    return sum_nodes

df_to_array(df, use_sparse_matrix=False)

Convert a DataFrame to a numpy or CuPy array.

Parameters:

Name Type Description Default
df Union[DataFrame, DataFrame]

The DataFrame to convert.

required
use_sparse_matrix bool

If True, a sparse matrix is used. Default is False.

False

Returns:

Type Description
tuple

A tuple containing the transposed array and the DataFrame columns.

Notes

The data is converted to an unsigned 8-bit integer array (np.uint8). If use_gpu is True, the array is further converted to a CuPy array.

Source code in src/action_rules/action_rules.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def df_to_array(self, df: Union['cudf.DataFrame', 'pandas.DataFrame'], use_sparse_matrix: bool = False) -> tuple:
    """
    Convert a DataFrame to a numpy or CuPy array.

    Parameters
    ----------
    df : Union[cudf.DataFrame, pandas.DataFrame]
        The DataFrame to convert.
    use_sparse_matrix : bool, optional
        If True, a sparse matrix is used. Default is False.

    Returns
    -------
    tuple
        A tuple containing the transposed array and the DataFrame columns.

    Notes
    -----
    The data is converted to an unsigned 8-bit integer array (`np.uint8`). If `use_gpu` is True,
    the array is further converted to a CuPy array.
    """
    columns = list(df.columns)
    # cuDF and CuPy
    if self.is_gpu_np and self.is_gpu_pd:
        if use_sparse_matrix:
            from cupyx.scipy.sparse import csr_matrix

            data = csr_matrix(df.values, dtype=self.np.float32).T  # type: ignore
        else:
            data = self.np.asarray(df.values, dtype=self.np.uint8).T  # type: ignore
    # Pandas and CuPy
    elif self.is_gpu_np and not self.is_gpu_pd:
        if use_sparse_matrix:
            from cupyx.scipy.sparse import csr_matrix
            from scipy.sparse import csr_matrix as scipy_csr_matrix

            scipy_matrix = scipy_csr_matrix(df.values).T
            data = csr_matrix(scipy_matrix, dtype=float)
        else:
            data = self.np.asarray(df.values, dtype=self.np.uint8).T  # type: ignore
    # cuDF and Numpy
    elif not self.is_gpu_np and self.is_gpu_pd:
        if use_sparse_matrix:
            from scipy.sparse import csr_matrix

            data = csr_matrix(df.to_numpy(), dtype=self.np.uint8).T  # type: ignore
        else:
            data = df.to_numpy().T  # type: ignore
    # Pandas and Numpy
    else:
        if use_sparse_matrix:
            from scipy.sparse import csr_matrix

            data = csr_matrix(df.values, dtype=self.np.uint8).T  # type: ignore
        else:
            data = df.to_numpy(dtype=self.np.uint8).T  # type: ignore
    return data, columns

fit(data, stable_attributes, flexible_attributes, target, target_undesired_state, target_desired_state, use_sparse_matrix=False, use_gpu=False)

Generate action rules based on the provided dataset and parameters.

Parameters:

Name Type Description Default
data Union[DataFrame, DataFrame]

The dataset to generate action rules from.

required
stable_attributes list

List of stable attributes.

required
flexible_attributes list

List of flexible attributes.

required
target str

The target attribute.

required
target_undesired_state str

The undesired state of the target attribute.

required
target_desired_state str

The desired state of the target attribute.

required
use_sparse_matrix bool

If True, a sparse matrix is used. Default is False.

False
use_gpu bool

Use GPU (cuDF) for data processing if available. Default is False.

False

Raises:

Type Description
RuntimeError

If the model has already been fitted.

Notes

This method performs one-hot encoding on the specified attributes, converts the DataFrame to an array, and generates action rules by iterating over candidate rules and pruning them based on the given parameters.

Source code in src/action_rules/action_rules.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def fit(
    self,
    data: Union['cudf.DataFrame', 'pandas.DataFrame'],
    stable_attributes: list,
    flexible_attributes: list,
    target: str,
    target_undesired_state: str,
    target_desired_state: str,
    use_sparse_matrix: bool = False,
    use_gpu: bool = False,
):
    """
    Generate action rules based on the provided dataset and parameters.

    Parameters
    ----------
    data : Union[cudf.DataFrame, pandas.DataFrame]
        The dataset to generate action rules from.
    stable_attributes : list
        List of stable attributes.
    flexible_attributes : list
        List of flexible attributes.
    target : str
        The target attribute.
    target_undesired_state : str
        The undesired state of the target attribute.
    target_desired_state : str
        The desired state of the target attribute.
    use_sparse_matrix : bool, optional
        If True, a sparse matrix is used. Default is False.
    use_gpu : bool, optional
        Use GPU (cuDF) for data processing if available. Default is False.

    Raises
    ------
    RuntimeError
        If the model has already been fitted.

    Notes
    -----
    This method performs one-hot encoding on the specified attributes, converts the DataFrame to an array,
    and generates action rules by iterating over candidate rules and pruning them based on the given parameters.
    """
    if self.output is not None:
        raise RuntimeError("The model is already fit.")
    self.set_array_library(use_gpu, data)
    if not self.is_onehot:
        data = self.one_hot_encode(data, stable_attributes, flexible_attributes, target)
    data, columns = self.df_to_array(data, use_sparse_matrix)

    stable_items_binding, flexible_items_binding, target_items_binding, column_values = self.get_bindings(
        columns, stable_attributes, flexible_attributes, target
    )

    self.intrinsic_utility_table, self.transition_utility_table = self.remap_utility_tables(column_values)

    if self.verbose:
        print('Maximum number of nodes to check for support:')
        print('_____________________________________________')
        print(self.count_max_nodes(stable_items_binding, flexible_items_binding))
        print('')
    stop_list = self.get_stop_list(stable_items_binding, flexible_items_binding)
    frames = self.get_split_tables(data, target_items_binding, target, use_sparse_matrix)
    undesired_state = columns.index(target + '_<item_target>_' + str(target_undesired_state))
    desired_state = columns.index(target + '_<item_target>_' + str(target_desired_state))

    stop_list_itemset = []  # type: list

    candidates_queue = [
        {
            'ar_prefix': tuple(),
            'itemset_prefix': tuple(),
            'stable_items_binding': stable_items_binding,
            'flexible_items_binding': flexible_items_binding,
            'undesired_mask': None,
            'desired_mask': None,
            'actionable_attributes': 0,
        }
    ]
    k = 0
    self.rules = Rules(
        undesired_state,
        desired_state,
        columns,
        data.shape[1],
        self.intrinsic_utility_table,
        self.transition_utility_table,
    )
    candidate_generator = CandidateGenerator(
        frames,
        self.min_stable_attributes,
        self.min_flexible_attributes,
        self.min_undesired_support,
        self.min_desired_support,
        self.min_undesired_confidence,
        self.min_desired_confidence,
        undesired_state,
        desired_state,
        self.rules,
        use_sparse_matrix,
    )
    while len(candidates_queue) > 0:
        candidate = candidates_queue.pop(0)
        if len(candidate['ar_prefix']) > k:
            k += 1
            self.rules.prune_classification_rules(k, stop_list)
        new_candidates = candidate_generator.generate_candidates(
            **candidate,
            stop_list=stop_list,
            stop_list_itemset=stop_list_itemset,
            undesired_state=undesired_state,
            desired_state=desired_state,
            verbose=self.verbose,
        )
        candidates_queue += new_candidates
    self.rules.generate_action_rules()
    self.output = Output(
        self.rules.action_rules, target, stable_items_binding, flexible_items_binding, column_values
    )
    del data
    if self.is_gpu_np:
        self.np.get_default_memory_pool().free_all_blocks()  # type: ignore

fit_onehot(data, stable_attributes, flexible_attributes, target, target_undesired_state, target_desired_state, use_sparse_matrix=False, use_gpu=False)

Preprocess and fit the model using one-hot encoded attributes.

This method prepares the dataset for generating action rules by performing one-hot encoding on the specified stable, flexible, and target attributes. The resulting dataset is then used to fit the model using the fit method.

Parameters:

Name Type Description Default
data Union[DataFrame, DataFrame]

The dataset to be processed and used for fitting the model.

required
stable_attributes dict

A dictionary mapping stable attribute names to lists of column names corresponding to those attributes.

required
flexible_attributes dict

A dictionary mapping flexible attribute names to lists of column names corresponding to those attributes.

required
target dict

A dictionary mapping the target attribute name to a list of column names corresponding to that attribute.

required
target_undesired_state str

The undesired state of the target attribute, used in action rule generation.

required
target_desired_state str

The desired state of the target attribute, used in action rule generation.

required
use_sparse_matrix bool

If True, a sparse matrix is used in the fitting process. Default is False.

False
use_gpu bool

If True, the GPU (cuDF) is used for data processing if available. Default is False.

False
Notes

The method modifies the dataset by: 1. Renaming columns according to the stable, flexible, and target attributes. 2. Removing columns that are not associated with any of these attributes. 3. Passing the processed dataset and relevant attribute lists to the fit method to generate action rules.

This method ensures that the dataset is correctly preprocessed for rule generation, focusing on the specified attributes and their one-hot encoded forms.

Source code in src/action_rules/action_rules.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def fit_onehot(
    self,
    data: Union['cudf.DataFrame', 'pandas.DataFrame'],
    stable_attributes: dict,
    flexible_attributes: dict,
    target: dict,
    target_undesired_state: str,
    target_desired_state: str,
    use_sparse_matrix: bool = False,
    use_gpu: bool = False,
):
    """
    Preprocess and fit the model using one-hot encoded attributes.

    This method prepares the dataset for generating action rules by
    performing one-hot encoding on the specified stable, flexible,
    and target attributes. The resulting dataset is then used to fit
    the model using the `fit` method.

    Parameters
    ----------
    data : Union[cudf.DataFrame, pandas.DataFrame]
        The dataset to be processed and used for fitting the model.
    stable_attributes : dict
        A dictionary mapping stable attribute names to lists of column
        names corresponding to those attributes.
    flexible_attributes : dict
        A dictionary mapping flexible attribute names to lists of column
        names corresponding to those attributes.
    target : dict
        A dictionary mapping the target attribute name to a list of
        column names corresponding to that attribute.
    target_undesired_state : str
        The undesired state of the target attribute, used in action rule generation.
    target_desired_state : str
        The desired state of the target attribute, used in action rule generation.
    use_sparse_matrix : bool, optional
        If True, a sparse matrix is used in the fitting process. Default is False.
    use_gpu : bool, optional
        If True, the GPU (cuDF) is used for data processing if available.
        Default is False.

    Notes
    -----
    The method modifies the dataset by:
    1. Renaming columns according to the stable, flexible, and target attributes.
    2. Removing columns that are not associated with any of these attributes.
    3. Passing the processed dataset and relevant attribute lists to the `fit` method
       to generate action rules.

    This method ensures that the dataset is correctly preprocessed for rule
    generation, focusing on the specified attributes and their one-hot encoded forms.
    """
    self.is_onehot = True
    data = data.copy()
    data = data.astype('bool')
    new_labels = []
    attributes_stable = set([])
    attribtes_flexible = set([])
    attribute_target = ''
    remove_cols = []
    for label in data.columns:
        to_remove = True
        for attribute, columns in stable_attributes.items():
            if label in columns:
                new_labels.append(attribute + '_<item_stable>_' + label)
                attributes_stable.add(attribute)
                to_remove = False
        for attribute, columns in flexible_attributes.items():
            if label in columns:
                new_labels.append(attribute + '_<item_flexible>_' + label)
                attribtes_flexible.add(attribute)
                to_remove = False
        for attribute, columns in target.items():
            if label in columns:
                new_labels.append(attribute + '_<item_target>_' + label)
                attribute_target = attribute
                to_remove = False
        if to_remove:
            new_labels.append(label)
            remove_cols.append(label)
    data.columns = new_labels
    data = data.drop(columns=remove_cols)
    self.fit(
        data,
        list(attributes_stable),
        list(attribtes_flexible),
        attribute_target,
        target_undesired_state,
        target_desired_state,
        use_sparse_matrix,
        use_gpu,
    )

get_bindings(columns, stable_attributes, flexible_attributes, target)

Bind attributes to corresponding columns in the dataset.

Parameters:

Name Type Description Default
columns list

List of column names in the dataset.

required
stable_attributes list

List of stable attributes.

required
flexible_attributes list

List of flexible attributes.

required
target str

The target attribute.

required

Returns:

Type Description
tuple

A tuple containing the bindings for stable attributes, flexible attributes, and target items.

Notes

The method generates mappings from column indices to attribute values for stable, flexible, and target attributes.

Source code in src/action_rules/action_rules.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
def get_bindings(
    self,
    columns: list,
    stable_attributes: list,
    flexible_attributes: list,
    target: str,
) -> tuple:
    """
    Bind attributes to corresponding columns in the dataset.

    Parameters
    ----------
    columns : list
        List of column names in the dataset.
    stable_attributes : list
        List of stable attributes.
    flexible_attributes : list
        List of flexible attributes.
    target : str
        The target attribute.

    Returns
    -------
    tuple
        A tuple containing the bindings for stable attributes, flexible attributes, and target items.

    Notes
    -----
    The method generates mappings from column indices to attribute values for stable, flexible, and target
    attributes.
    """
    stable_items_binding = defaultdict(lambda: [])
    flexible_items_binding = defaultdict(lambda: [])
    target_items_binding = defaultdict(lambda: [])
    column_values = {}

    for i, col in enumerate(columns):
        is_continue = False
        # stable
        for attribute in stable_attributes:
            if col.startswith(attribute + '_<item_stable>_'):
                stable_items_binding[attribute].append(i)
                column_values[i] = (attribute, col.split('_<item_stable>_', 1)[1])
                is_continue = True
                break
        if is_continue is True:
            continue
        # flexible
        for attribute in flexible_attributes:
            if col.startswith(attribute + '_<item_flexible>_'):
                flexible_items_binding[attribute].append(i)
                column_values[i] = (attribute, col.split('_<item_flexible>_', 1)[1])
                is_continue = True
                break
        if is_continue is True:
            continue
        # target
        if col.startswith(target + '_<item_target>_'):
            target_items_binding[target].append(i)
            column_values[i] = (target, col.split('_<item_target>_', 1)[1])
    return stable_items_binding, flexible_items_binding, target_items_binding, column_values

get_rules()

Return the generated action rules if available.

Raises:

Type Description
RuntimeError

If the model has not been fitted.

Returns:

Type Description
Output

The generated action rules.

Notes

This method returns the Output object containing the generated action rules.

Source code in src/action_rules/action_rules.py
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
def get_rules(self) -> Output:
    """
    Return the generated action rules if available.

    Raises
    ------
    RuntimeError
        If the model has not been fitted.

    Returns
    -------
    Output
        The generated action rules.

    Notes
    -----
    This method returns the `Output` object containing the generated action rules.
    """
    if self.output is None:
        raise RuntimeError("The model is not fit.")
    return self.output

get_split_tables(data, target_items_binding, target, use_sparse_matrix=False)

Split the dataset into tables based on target item bindings.

Parameters:

Name Type Description Default
data Union[ndarray, ndarray, csr_matrix, csr_matrix]

The dataset to be split.

required
target_items_binding dict

Dictionary containing bindings for target items.

required
target str

The target attribute.

required
use_sparse_matrix bool

If True, a sparse matrix is used. Default is False.

False

Returns:

Type Description
dict

A dictionary containing the split tables.

Notes

The method creates masks for the target items and splits the data accordingly.

Source code in src/action_rules/action_rules.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
def get_split_tables(
    self,
    data: Union['numpy.ndarray', 'cupy.ndarray', 'cupyx.scipy.sparse.csr_matrix', 'scipy.sparse.csr_matrix'],
    target_items_binding: dict,
    target: str,
    use_sparse_matrix: bool = False,
) -> dict:
    """
    Split the dataset into tables based on target item bindings.

    Parameters
    ----------
    data : Union['numpy.ndarray', 'cupy.ndarray', 'cupyx.scipy.sparse.csr_matrix', 'scipy.sparse.csr_matrix']
        The dataset to be split.
    target_items_binding : dict
        Dictionary containing bindings for target items.
    target : str
        The target attribute.
    use_sparse_matrix : bool, optional
        If True, a sparse matrix is used. Default is False.

    Returns
    -------
    dict
        A dictionary containing the split tables.

    Notes
    -----
    The method creates masks for the target items and splits the data accordingly.
    """
    frames = {}
    for item in target_items_binding[target]:
        mask = data[item] == 1
        if use_sparse_matrix:
            frames[item] = data.multiply(mask)  # type: ignore
        else:
            frames[item] = data[:, mask]
    return frames

get_stop_list(stable_items_binding, flexible_items_binding)

Generate a stop list to prevent certain combinations of attributes.

Parameters:

Name Type Description Default
stable_items_binding dict

Dictionary containing bindings for stable items.

required
flexible_items_binding dict

Dictionary containing bindings for flexible items.

required

Returns:

Type Description
list

A list of stop combinations.

Notes

The stop list is generated by creating pairs of stable item indices and ensuring flexible items do not repeat.

Source code in src/action_rules/action_rules.py
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
def get_stop_list(self, stable_items_binding: dict, flexible_items_binding: dict) -> list:
    """
    Generate a stop list to prevent certain combinations of attributes.

    Parameters
    ----------
    stable_items_binding : dict
        Dictionary containing bindings for stable items.
    flexible_items_binding : dict
        Dictionary containing bindings for flexible items.

    Returns
    -------
    list
        A list of stop combinations.

    Notes
    -----
    The stop list is generated by creating pairs of stable item indices and ensuring flexible items do not repeat.
    """
    stop_list = []
    for items in stable_items_binding.values():
        for stop_couple in itertools.product(items, repeat=2):
            stop_list.append(tuple(stop_couple))
    for item in flexible_items_binding.keys():
        stop_list.append(tuple([item, item]))
    return stop_list

one_hot_encode(data, stable_attributes, flexible_attributes, target)

Perform one-hot encoding on the specified stable, flexible, and target attributes of the DataFrame.

Parameters:

Name Type Description Default
data Union[DataFrame, DataFrame]

The input DataFrame containing the data to be encoded.

required
stable_attributes list

List of stable attributes to be one-hot encoded.

required
flexible_attributes list

List of flexible attributes to be one-hot encoded.

required
target str

The target attribute to be one-hot encoded.

required

Returns:

Type Description
Union[DataFrame, DataFrame]

A DataFrame with the specified attributes one-hot encoded.

Notes

The input data is first converted to string type to ensure consistent encoding. The stable attributes, flexible attributes, and target attribute are then one-hot encoded separately and concatenated into a single DataFrame.

Source code in src/action_rules/action_rules.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def one_hot_encode(
    self,
    data: Union['cudf.DataFrame', 'pandas.DataFrame'],
    stable_attributes: list,
    flexible_attributes: list,
    target: str,
) -> Union['cudf.DataFrame', 'pandas.DataFrame']:
    """
    Perform one-hot encoding on the specified stable, flexible, and target attributes of the DataFrame.

    Parameters
    ----------
    data : Union[cudf.DataFrame, pandas.DataFrame]
        The input DataFrame containing the data to be encoded.
    stable_attributes : list
        List of stable attributes to be one-hot encoded.
    flexible_attributes : list
        List of flexible attributes to be one-hot encoded.
    target : str
        The target attribute to be one-hot encoded.

    Returns
    -------
    Union[cudf.DataFrame, pandas.DataFrame]
        A DataFrame with the specified attributes one-hot encoded.

    Notes
    -----
    The input data is first converted to string type to ensure consistent encoding. The stable attributes,
    flexible attributes, and target attribute are then one-hot encoded separately and concatenated into a
    single DataFrame.
    """
    data = data.astype(str)
    to_concat = []
    if len(stable_attributes) > 0:
        data_stable = self.pd.get_dummies(  # type: ignore
            data[stable_attributes], sparse=False, prefix_sep='_<item_stable>_'
        )
        to_concat.append(data_stable)
    if len(flexible_attributes) > 0:
        data_flexible = self.pd.get_dummies(  # type: ignore
            data[flexible_attributes], sparse=False, prefix_sep='_<item_flexible>_'
        )
        to_concat.append(data_flexible)
    data_target = self.pd.get_dummies(data[[target]], sparse=False, prefix_sep='_<item_target>_')  # type: ignore
    to_concat.append(data_target)
    data = self.pd.concat(to_concat, axis=1)  # type: ignore
    return data

predict(frame_row)

Predict recommended actions based on the provided row of data.

This method applies the fitted action rules to the given row of data and generates a DataFrame with recommended actions if any of the action rules are triggered.

Parameters:

Name Type Description Default
frame_row Union[Series, Series]

A row of data in the form of a cuDF or pandas Series. The Series should contain the features required by the action rules.

required

Returns:

Type Description
Union[DataFrame, DataFrame]

A DataFrame with the recommended actions. The DataFrame includes the following columns: - The original attributes with recommended changes. - 'ActionRules_RuleIndex': Index of the action rule applied. - 'ActionRules_UndesiredSupport': Support of the undesired part of the rule. - 'ActionRules_DesiredSupport': Support of the desired part of the rule. - 'ActionRules_UndesiredConfidence': Confidence of the undesired part of the rule. - 'ActionRules_DesiredConfidence': Confidence of the desired part of the rule. - 'ActionRules_Uplift': Uplift value of the rule.

Raises:

Type Description
RuntimeError

If the model has not been fitted.

Notes

The method compares the given row of data against the undesired itemsets of the action rules. If a match is found, it applies the desired itemset changes and records the action rule's metadata. The result is a DataFrame with one or more rows representing the recommended actions for the given data.

Source code in src/action_rules/action_rules.py
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
def predict(self, frame_row: Union['cudf.Series', 'pandas.Series']) -> Union['cudf.DataFrame', 'pandas.DataFrame']:
    """
    Predict recommended actions based on the provided row of data.

    This method applies the fitted action rules to the given row of data and generates
    a DataFrame with recommended actions if any of the action rules are triggered.

    Parameters
    ----------
    frame_row : Union['cudf.Series', 'pandas.Series']
        A row of data in the form of a cuDF or pandas Series. The Series should
        contain the features required by the action rules.

    Returns
    -------
    Union['cudf.DataFrame', 'pandas.DataFrame']
        A DataFrame with the recommended actions. The DataFrame includes the following columns:
        - The original attributes with recommended changes.
        - 'ActionRules_RuleIndex': Index of the action rule applied.
        - 'ActionRules_UndesiredSupport': Support of the undesired part of the rule.
        - 'ActionRules_DesiredSupport': Support of the desired part of the rule.
        - 'ActionRules_UndesiredConfidence': Confidence of the undesired part of the rule.
        - 'ActionRules_DesiredConfidence': Confidence of the desired part of the rule.
        - 'ActionRules_Uplift': Uplift value of the rule.

    Raises
    ------
    RuntimeError
        If the model has not been fitted.

    Notes
    -----
    The method compares the given row of data against the undesired itemsets of the action rules.
    If a match is found, it applies the desired itemset changes and records the action rule's
    metadata. The result is a DataFrame with one or more rows representing the recommended actions
    for the given data.
    """
    if self.output is None:
        raise RuntimeError("The model is not fit.")
    index_value_tuples = list(zip(frame_row.index, frame_row))
    values = []
    column_values = self.output.column_values
    for index_value_tuple in index_value_tuples:
        values.append(list(column_values.keys())[list(column_values.values()).index(index_value_tuple)])
    new_values = tuple(values)
    predicted = []
    for i, action_rule in enumerate(self.output.action_rules):
        if set(action_rule['undesired']['itemset']) <= set(new_values):
            predicted_row = frame_row.copy()
            for recommended in set(action_rule['desired']['itemset']) - set(new_values):
                attribute, value = column_values[recommended]
                predicted_row[attribute + ' (Recommended)'] = value
            predicted_row['ActionRules_RuleIndex'] = i
            predicted_row['ActionRules_UndesiredSupport'] = action_rule['undesired']['support']
            predicted_row['ActionRules_DesiredSupport'] = action_rule['desired']['support']
            predicted_row['ActionRules_UndesiredConfidence'] = action_rule['undesired']['confidence']
            predicted_row['ActionRules_DesiredConfidence'] = action_rule['desired']['confidence']
            predicted_row['ActionRules_Uplift'] = action_rule['uplift']
            predicted.append(predicted_row)
    return self.pd.DataFrame(predicted)  # type: ignore

remap_utility_tables(column_values)

Remap the keys of intrinsic and transition utility tables using the provided column mapping.

The function uses column_values, a dictionary mapping internal column indices to (attribute, value) tuples, to invert the mapping so that utility table keys are replaced with the corresponding integer index (for intrinsic utilities) or a tuple of integer indices (for transition utilities).

Parameters:

Name Type Description Default
column_values dict

Dictionary mapping integer column indices to (attribute, value) pairs. Example: {0: ('Age', 'O'), 1: ('Age', 'Y'), 2: ('Sex', 'F'), ...}

required

Returns:

Type Description
tuple

A tuple (remapped_intrinsic, remapped_transition) where: - remapped_intrinsic is a dict mapping integer column index to utility value. - remapped_transition is a dict mapping (from_index, to_index) to utility value.

Notes
  • The method performs case-insensitive matching by converting attribute names and values to lowercase.
  • If a key in a utility table does not have a corresponding entry in column_values, it is skipped.
Source code in src/action_rules/action_rules.py
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
def remap_utility_tables(self, column_values):
    """
    Remap the keys of intrinsic and transition utility tables using the provided column mapping.

    The function uses `column_values`, a dictionary mapping internal column indices to
    (attribute, value) tuples, to invert the mapping so that utility table keys are replaced
    with the corresponding integer index (for intrinsic utilities) or a tuple of integer indices
    (for transition utilities).

    Parameters
    ----------
    column_values : dict
        Dictionary mapping integer column indices to (attribute, value) pairs.
        Example: {0: ('Age', 'O'), 1: ('Age', 'Y'), 2: ('Sex', 'F'), ...}

    Returns
    -------
    tuple
        A tuple (remapped_intrinsic, remapped_transition) where:
          - remapped_intrinsic is a dict mapping integer column index to utility value.
          - remapped_transition is a dict mapping (from_index, to_index) to utility value.

    Notes
    -----
    - The method performs case-insensitive matching by converting attribute names and values to lowercase.
    - If a key in a utility table does not have a corresponding entry in column_values, it is skipped.
    """
    # Invert column_values to map (attribute.lower(), value.lower()) -> column index.
    inv_map = {(attr.lower(), val.lower()): idx for idx, (attr, val) in column_values.items()}

    remapped_intrinsic = {}
    # Remap intrinsic utility table keys: ('Attribute', 'Value') -> utility
    for key, utility in self.intrinsic_utility_table.items():
        # Normalize key to lowercase
        attr, val = key
        lookup_key = (attr.lower(), val.lower())
        # Look up the corresponding column index; if not found, skip this key.
        if lookup_key in inv_map:
            col_index = inv_map[lookup_key]
            remapped_intrinsic[col_index] = utility
        # Else: optionally, one could log or warn about a missing mapping.

    remapped_transition = {}
    # Remap transition utility table keys: ('Attribute', from_value, to_value) -> utility
    for key, utility in self.transition_utility_table.items():
        attr, from_val, to_val = key
        lookup_from = (attr.lower(), from_val.lower())
        lookup_to = (attr.lower(), to_val.lower())
        # Only remap if both the from and to values exist in inv_map.
        if lookup_from in inv_map and lookup_to in inv_map:
            from_index = inv_map[lookup_from]
            to_index = inv_map[lookup_to]
            remapped_transition[(from_index, to_index)] = utility
        # Else: skip or log missing mapping.

    return remapped_intrinsic, remapped_transition

set_array_library(use_gpu, df)

Set the appropriate array and DataFrame libraries (cuDF or pandas) based on the user's preference.

Parameters:

Name Type Description Default
use_gpu bool

Indicates whether to use GPU (cuDF) for data processing if available.

required
df Union[DataFrame, DataFrame]

The DataFrame to convert.

required

Raises:

Type Description
ImportError

If use_gpu is True but cuDF is not available and pandas cannot be imported as fallback.

Warnings

UserWarning If use_gpu is True but cuDF is not available, a warning is issued indicating fallback to pandas.

Notes

This method determines whether to use GPU-accelerated libraries for processing data, falling back to CPU-based libraries if necessary.

Source code in src/action_rules/action_rules.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def set_array_library(self, use_gpu: bool, df: Union['cudf.DataFrame', 'pandas.DataFrame']):
    """
    Set the appropriate array and DataFrame libraries (cuDF or pandas) based on the user's preference.

    Parameters
    ----------
    use_gpu : bool
        Indicates whether to use GPU (cuDF) for data processing if available.
    df : Union[cudf.DataFrame, pandas.DataFrame]
        The DataFrame to convert.

    Raises
    ------
    ImportError
        If `use_gpu` is True but cuDF is not available and pandas cannot be imported as fallback.

    Warnings
    --------
    UserWarning
        If `use_gpu` is True but cuDF is not available, a warning is issued indicating fallback to pandas.

    Notes
    -----
    This method determines whether to use GPU-accelerated libraries for processing data, falling back to CPU-based
    libraries if necessary.
    """
    if use_gpu:
        try:
            import cupy as np

            is_gpu_np = True
        except ImportError:
            warnings.warn("CuPy is not available. Falling back to Numpy.")
            import numpy as np

            is_gpu_np = False
    else:
        import numpy as np

        is_gpu_np = False

    df_library_imported = False
    try:
        import pandas as pd

        if isinstance(df, pd.DataFrame):
            is_gpu_pd = False
            df_library_imported = True
    except ImportError:
        df_library_imported = False

    if not df_library_imported:
        try:
            import cudf as pd

            if isinstance(df, pd.DataFrame):
                is_gpu_pd = True
                df_library_imported = True
        except ImportError:
            df_library_imported = False

    if not df_library_imported:
        raise ImportError('Just Pandas or cuDF dataframes are supported.')

    self.np = np
    self.pd = pd
    self.is_gpu_np = is_gpu_np
    self.is_gpu_pd = is_gpu_pd

Input

A class used to import action rules.

Methods:

Name Description
import_action_rules

Import action rules from a JSON string and set the action_rules attribute.

Source code in src/action_rules/input/input.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class Input:
    """
    A class used to import action rules.

    Methods
    -------
    import_action_rules(json_data)
        Import action rules from a JSON string and set the action_rules attribute.
    """

    def __init__(self):
        """
        Initialize the Input class.

        Notes
        -----
        This class is used to import action rules from a JSON string and convert them into an Output object.
        """

    def import_action_rules(self, json_data: str) -> Output:
        """
        Import action rules from a JSON string and set the action_rules attribute.

        Parameters
        ----------
        json_data : str
            JSON string representing the action rules.

        Returns
        -------
        Output
            Output object representing the action rules.

        Notes
        -----
        This method parses a JSON string containing action rules, extracts relevant information,
        and constructs an Output object. The method initializes the target attribute, stable items,
        flexible items, and column values. It processes both stable and flexible items for each rule
        and updates the corresponding dictionaries.

        The JSON structure is expected to have the following format:
        [
            {
                "target": {
                    "attribute": "target_attribute",
                    "undesired": "undesired_value",
                    "desired": "desired_value"
                },
                "support of undesired part": int,
                "confidence of undesired part": float,
                "support of desired part": int,
                "confidence of desired part": float,
                "uplift": float,
                "max_rule_gain": float,                   (optional)
                "realistic_rule_gain": float,             (optional)
                "realistic_dataset_gain": float,          (optional)
                "stable": [
                    {"attribute": "attribute_name", "value": "attribute_value"},
                    ...
                ],
                "flexible": [
                    {"attribute": "attribute_name", "undesired": "undesired_value", "desired": "desired_value"},
                    ...
                ]
            },
            ...
        ]

        The method ensures that each attribute-value pair is assigned a unique index and maintains
        the mappings in the column_values dictionary. The stable_items_binding and flexible_items_binding
        dictionaries are updated accordingly.

        Example
        -------
        json_data = '''
        [
            {
                "target": {
                    "attribute": "target",
                    "undesired": "no",
                    "desired": "yes"
                },
                "support of undesired part": 10,
                "confidence of undesired part": 0.5,
                "support of desired part": 20,
                "confidence of desired part": 0.8,
                "uplift": 0.3,
                "stable": [
                    {"attribute": "age", "value": "young"},
                    {"attribute": "income", "value": "high"}
                ],
                "flexible": [
                    {"attribute": "education", "undesired": "low", "desired": "high"}
                ]
            }
        ]
        '''
        input_obj = Input()
        output = input_obj.import_action_rules(json_data)
        """
        rules = json.loads(json_data)
        action_rules = []
        target = rules[0]['target']['attribute']
        stable_items_binding = {}  # type: dict
        flexible_items_binding = {}  # type: dict
        column_values = {}
        highest_index = 0
        for rule in rules:
            if highest_index == 0:
                column_values[highest_index] = (rule['target']['attribute'], rule['target']['undesired'])
                highest_index += 1
                column_values[highest_index] = (rule['target']['attribute'], rule['target']['desired'])
                highest_index += 1
            ar_dict = {
                'undesired': {
                    'itemset': [],
                    'support': rule['support of undesired part'],
                    'confidence': rule['confidence of undesired part'],
                    'target': 0,
                },
                'desired': {
                    'itemset': [],
                    'support': rule['support of desired part'],
                    'confidence': rule['confidence of desired part'],
                    'target': 1,
                },
                'uplift': rule['uplift'],
                'support': (
                    rule['support']
                    if 'support' in rule
                    else min(rule['support of undesired part'], rule['support of desired part'])
                ),
                'confidence': (
                    rule['confidence']
                    if 'confidence' in rule
                    else rule['confidence of undesired part'] * rule['confidence of desired part']
                ),
            }
            for item in rule['stable']:
                if (item['attribute'], item['value']) not in column_values.values():
                    column_values[highest_index] = (item['attribute'], item['value'])
                    if 'flexible_as_stable' in item:
                        if item['attribute'] not in flexible_items_binding.keys():
                            flexible_items_binding.update({item['attribute']: []})
                        flexible_items_binding[item['attribute']].append(highest_index)
                    else:
                        if item['attribute'] not in stable_items_binding.keys():
                            stable_items_binding.update({item['attribute']: []})
                        stable_items_binding[item['attribute']].append(highest_index)
                    highest_index += 1
                value = [
                    key
                    for key, (attr, value) in column_values.items()
                    if value == item['value'] and attr == item['attribute']
                ][0]
                ar_dict['undesired']['itemset'].append(value)
                ar_dict['desired']['itemset'].append(value)

            for item in rule['flexible']:
                if item['attribute'] not in flexible_items_binding.keys():
                    flexible_items_binding.update({item['attribute']: []})
                if (item['attribute'], item['undesired']) not in column_values.values():
                    column_values[highest_index] = (item['attribute'], item['undesired'])
                    flexible_items_binding[item['attribute']].append(highest_index)
                    highest_index += 1
                if (item['attribute'], item['desired']) not in column_values.values():
                    column_values[highest_index] = (item['attribute'], item['desired'])
                    flexible_items_binding[item['attribute']].append(highest_index)
                    highest_index += 1
                value_0 = [
                    key
                    for key, (attr, value) in column_values.items()
                    if value == item['undesired'] and attr == item['attribute']
                ][0]
                value_1 = [
                    key
                    for key, (attr, value) in column_values.items()
                    if value == item['desired'] and attr == item['attribute']
                ][0]
                ar_dict['undesired']['itemset'].append(value_0)
                ar_dict['desired']['itemset'].append(value_1)

            # If the JSON rule includes utility parameters, add them to the rule dictionary.
            for utility_key in [
                'max_rule_gain',
                'realistic_rule_gain',
                'realistic_dataset_gain',
            ]:
                if utility_key in rule:
                    ar_dict[utility_key] = rule[utility_key]

            action_rules.append(ar_dict)

        return Output(action_rules, target, stable_items_binding, flexible_items_binding, column_values)

__init__()

Initialize the Input class.

Notes

This class is used to import action rules from a JSON string and convert them into an Output object.

Source code in src/action_rules/input/input.py
18
19
20
21
22
23
24
25
def __init__(self):
    """
    Initialize the Input class.

    Notes
    -----
    This class is used to import action rules from a JSON string and convert them into an Output object.
    """

import_action_rules(json_data)

Import action rules from a JSON string and set the action_rules attribute.

Parameters:

Name Type Description Default
json_data str

JSON string representing the action rules.

required

Returns:

Type Description
Output

Output object representing the action rules.

Notes

This method parses a JSON string containing action rules, extracts relevant information, and constructs an Output object. The method initializes the target attribute, stable items, flexible items, and column values. It processes both stable and flexible items for each rule and updates the corresponding dictionaries.

The JSON structure is expected to have the following format: [ { "target": { "attribute": "target_attribute", "undesired": "undesired_value", "desired": "desired_value" }, "support of undesired part": int, "confidence of undesired part": float, "support of desired part": int, "confidence of desired part": float, "uplift": float, "max_rule_gain": float, (optional) "realistic_rule_gain": float, (optional) "realistic_dataset_gain": float, (optional) "stable": [ {"attribute": "attribute_name", "value": "attribute_value"}, ... ], "flexible": [ {"attribute": "attribute_name", "undesired": "undesired_value", "desired": "desired_value"}, ... ] }, ... ]

The method ensures that each attribute-value pair is assigned a unique index and maintains the mappings in the column_values dictionary. The stable_items_binding and flexible_items_binding dictionaries are updated accordingly.

Example

json_data = ''' [ { "target": { "attribute": "target", "undesired": "no", "desired": "yes" }, "support of undesired part": 10, "confidence of undesired part": 0.5, "support of desired part": 20, "confidence of desired part": 0.8, "uplift": 0.3, "stable": [ {"attribute": "age", "value": "young"}, {"attribute": "income", "value": "high"} ], "flexible": [ {"attribute": "education", "undesired": "low", "desired": "high"} ] } ] ''' input_obj = Input() output = input_obj.import_action_rules(json_data)

Source code in src/action_rules/input/input.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def import_action_rules(self, json_data: str) -> Output:
    """
    Import action rules from a JSON string and set the action_rules attribute.

    Parameters
    ----------
    json_data : str
        JSON string representing the action rules.

    Returns
    -------
    Output
        Output object representing the action rules.

    Notes
    -----
    This method parses a JSON string containing action rules, extracts relevant information,
    and constructs an Output object. The method initializes the target attribute, stable items,
    flexible items, and column values. It processes both stable and flexible items for each rule
    and updates the corresponding dictionaries.

    The JSON structure is expected to have the following format:
    [
        {
            "target": {
                "attribute": "target_attribute",
                "undesired": "undesired_value",
                "desired": "desired_value"
            },
            "support of undesired part": int,
            "confidence of undesired part": float,
            "support of desired part": int,
            "confidence of desired part": float,
            "uplift": float,
            "max_rule_gain": float,                   (optional)
            "realistic_rule_gain": float,             (optional)
            "realistic_dataset_gain": float,          (optional)
            "stable": [
                {"attribute": "attribute_name", "value": "attribute_value"},
                ...
            ],
            "flexible": [
                {"attribute": "attribute_name", "undesired": "undesired_value", "desired": "desired_value"},
                ...
            ]
        },
        ...
    ]

    The method ensures that each attribute-value pair is assigned a unique index and maintains
    the mappings in the column_values dictionary. The stable_items_binding and flexible_items_binding
    dictionaries are updated accordingly.

    Example
    -------
    json_data = '''
    [
        {
            "target": {
                "attribute": "target",
                "undesired": "no",
                "desired": "yes"
            },
            "support of undesired part": 10,
            "confidence of undesired part": 0.5,
            "support of desired part": 20,
            "confidence of desired part": 0.8,
            "uplift": 0.3,
            "stable": [
                {"attribute": "age", "value": "young"},
                {"attribute": "income", "value": "high"}
            ],
            "flexible": [
                {"attribute": "education", "undesired": "low", "desired": "high"}
            ]
        }
    ]
    '''
    input_obj = Input()
    output = input_obj.import_action_rules(json_data)
    """
    rules = json.loads(json_data)
    action_rules = []
    target = rules[0]['target']['attribute']
    stable_items_binding = {}  # type: dict
    flexible_items_binding = {}  # type: dict
    column_values = {}
    highest_index = 0
    for rule in rules:
        if highest_index == 0:
            column_values[highest_index] = (rule['target']['attribute'], rule['target']['undesired'])
            highest_index += 1
            column_values[highest_index] = (rule['target']['attribute'], rule['target']['desired'])
            highest_index += 1
        ar_dict = {
            'undesired': {
                'itemset': [],
                'support': rule['support of undesired part'],
                'confidence': rule['confidence of undesired part'],
                'target': 0,
            },
            'desired': {
                'itemset': [],
                'support': rule['support of desired part'],
                'confidence': rule['confidence of desired part'],
                'target': 1,
            },
            'uplift': rule['uplift'],
            'support': (
                rule['support']
                if 'support' in rule
                else min(rule['support of undesired part'], rule['support of desired part'])
            ),
            'confidence': (
                rule['confidence']
                if 'confidence' in rule
                else rule['confidence of undesired part'] * rule['confidence of desired part']
            ),
        }
        for item in rule['stable']:
            if (item['attribute'], item['value']) not in column_values.values():
                column_values[highest_index] = (item['attribute'], item['value'])
                if 'flexible_as_stable' in item:
                    if item['attribute'] not in flexible_items_binding.keys():
                        flexible_items_binding.update({item['attribute']: []})
                    flexible_items_binding[item['attribute']].append(highest_index)
                else:
                    if item['attribute'] not in stable_items_binding.keys():
                        stable_items_binding.update({item['attribute']: []})
                    stable_items_binding[item['attribute']].append(highest_index)
                highest_index += 1
            value = [
                key
                for key, (attr, value) in column_values.items()
                if value == item['value'] and attr == item['attribute']
            ][0]
            ar_dict['undesired']['itemset'].append(value)
            ar_dict['desired']['itemset'].append(value)

        for item in rule['flexible']:
            if item['attribute'] not in flexible_items_binding.keys():
                flexible_items_binding.update({item['attribute']: []})
            if (item['attribute'], item['undesired']) not in column_values.values():
                column_values[highest_index] = (item['attribute'], item['undesired'])
                flexible_items_binding[item['attribute']].append(highest_index)
                highest_index += 1
            if (item['attribute'], item['desired']) not in column_values.values():
                column_values[highest_index] = (item['attribute'], item['desired'])
                flexible_items_binding[item['attribute']].append(highest_index)
                highest_index += 1
            value_0 = [
                key
                for key, (attr, value) in column_values.items()
                if value == item['undesired'] and attr == item['attribute']
            ][0]
            value_1 = [
                key
                for key, (attr, value) in column_values.items()
                if value == item['desired'] and attr == item['attribute']
            ][0]
            ar_dict['undesired']['itemset'].append(value_0)
            ar_dict['desired']['itemset'].append(value_1)

        # If the JSON rule includes utility parameters, add them to the rule dictionary.
        for utility_key in [
            'max_rule_gain',
            'realistic_rule_gain',
            'realistic_dataset_gain',
        ]:
            if utility_key in rule:
                ar_dict[utility_key] = rule[utility_key]

        action_rules.append(ar_dict)

    return Output(action_rules, target, stable_items_binding, flexible_items_binding, column_values)

Output

A class used to format and export action rules.

Attributes:

Name Type Description
action_rules list

List containing the action rules.

target str

The target attribute for the action rules.

stable_cols list

List of indices for stable columns.

flexible_cols list

List of indices for flexible columns.

column_values dict

Dictionary containing the values of the columns.

Methods:

Name Description
get_ar_notation

Generate a string representation of the action rules in a human-readable format.

get_export_notation

Generate a JSON string of dictionaries representing the action rules for export.

get_pretty_ar_notation

Generate a list of text strings representing the action rules.

Source code in src/action_rules/output/output.py
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
class Output:
    """
    A class used to format and export action rules.

    Attributes
    ----------
    action_rules : list
        List containing the action rules.
    target : str
        The target attribute for the action rules.
    stable_cols : list
        List of indices for stable columns.
    flexible_cols : list
        List of indices for flexible columns.
    column_values : dict
        Dictionary containing the values of the columns.

    Methods
    -------
    get_ar_notation()
        Generate a string representation of the action rules in a human-readable format.
    get_export_notation()
        Generate a JSON string of dictionaries representing the action rules for export.
    get_pretty_ar_notation()
        Generate a list of text strings representing the action rules.
    """

    def __init__(
        self,
        action_rules: list,
        target: str,
        stable_items_binding: dict,
        flexible_items_binding: dict,
        column_values: dict,
    ):
        """
        Initialize the Output class with the specified action rules and target attribute.

        Parameters
        ----------
        action_rules : list
            List containing the action rules.
        target : str
            The target attribute for the action rules.
        stable_items_binding : dict
            Dictionary containing bindings for stable items.
        flexible_items_binding : dict
            Dictionary containing bindings for flexible items.
        column_values : dict
            Dictionary containing the values of the columns.

        Notes
        -----
        The constructor initializes the Output object by setting the provided action rules, target attribute,
        stable items, flexible items, and column values. It flattens the stable and flexible items bindings to
        create lists of indices for stable and flexible columns.
        """
        self.action_rules = action_rules
        self.target = target
        self.stable_cols = [item for sublist in stable_items_binding.values() for item in sublist]
        self.flexible_cols = [item for sublist in flexible_items_binding.values() for item in sublist]
        self.column_values = column_values

    def get_ar_notation(self):
        """
        Generate a string representation of the action rules in a human-readable format.

        Returns
        -------
        str
            String representation of the action rules.

        Notes
        -----
        This method constructs a human-readable string representation of the action rules. Each rule is
        formatted to show the attribute-value conditions and transitions. The representation includes
        the support and confidence values for both the undesired and desired parts, as well as the uplift.
        """
        ar_notation = []
        for action_rule in self.action_rules:
            rule = '['
            for i, item in enumerate(action_rule['undesired']['itemset']):
                if i > 0:
                    rule += ' ∧ '
                rule += '('
                if item == action_rule['desired']['itemset'][i]:
                    if item in self.stable_cols:
                        val = self.column_values[item]
                        rule += str(val[0]) + ': ' + str(val[1])
                    else:
                        val = self.column_values[item]
                        rule += str(val[0]) + '*: ' + str(val[1])
                else:
                    val = self.column_values[item]
                    val_desired = self.column_values[action_rule['desired']['itemset'][i]]
                    rule += str(val[0]) + ': ' + str(val[1]) + ' → ' + str(val_desired[1])
                rule += ')'
            rule += (
                '] ⇒ ['
                + str(self.target)
                + ': '
                + str(self.column_values[action_rule['undesired']['target']][1])
                + ' → '
                + str(self.column_values[action_rule['desired']['target']][1])
                + ']'
            )
            rule += (
                ', support of undesired part: '
                + str(action_rule['undesired']['support'])
                + ', confidence of undesired part: '
                + str(action_rule['undesired']['confidence'])
            )
            rule += (
                ', support of desired part: '
                + str(action_rule['desired']['support'])
                + ', confidence of desired part: '
                + str(action_rule['desired']['confidence'])
            )
            rule += ', support: ' + str(action_rule['support']) + ', confidence: ' + str(action_rule['confidence'])
            rule += ', uplift: ' + str(action_rule['uplift'])
            # If utility measures exist, include them in the output.
            if 'realistic_rule_gain' in action_rule:
                rule += ", max_rule_gain: " + str(action_rule['max_rule_gain'])
                rule += ", realistic_rule_gain: " + str(action_rule['realistic_rule_gain'])
                rule += ", realistic_dataset_gain: " + str(action_rule['realistic_dataset_gain'])
            ar_notation.append(rule)
        return ar_notation

    def get_export_notation(self):
        """
        Generate a JSON string of dictionaries representing the action rules for export.

        Returns
        -------
        str
            JSON string of dictionaries representing the action rules.

        Notes
        -----
        This method constructs a list of dictionaries where each dictionary represents an action rule.
        The dictionaries include attributes for stable and flexible items, as well as the target attribute,
        support, confidence, and uplift values. The list is then converted to a JSON string for export.
        """
        rules = []
        for ar_dict in self.action_rules:
            rule = {'stable': [], 'flexible': []}
            for i, item in enumerate(ar_dict['undesired']['itemset']):
                if item == ar_dict['desired']['itemset'][i]:
                    if item in self.stable_cols:
                        val = self.column_values[item]
                        rule['stable'].append({'attribute': val[0], 'value': val[1]})
                    else:
                        val = self.column_values[item]
                        rule['stable'].append({'attribute': val[0], 'value': val[1], 'flexible_as_stable': True})
                else:
                    val = self.column_values[item]
                    val_desired = self.column_values[ar_dict['desired']['itemset'][i]]
                    rule['flexible'].append({'attribute': val[0], 'undesired': val[1], 'desired': val_desired[1]})
            rule['target'] = {
                'attribute': self.target,
                'undesired': str(self.column_values[ar_dict['undesired']['target']][1]),
                'desired': str(self.column_values[ar_dict['desired']['target']][1]),
            }
            rule['support of undesired part'] = int(ar_dict['undesired']['support'])
            rule['confidence of undesired part'] = float(ar_dict['undesired']['confidence'])
            rule['support of desired part'] = int(ar_dict['desired']['support'])
            rule['confidence of desired part'] = float(ar_dict['desired']['confidence'])
            rule['uplift'] = float(ar_dict['uplift'])
            rule['support'] = int(ar_dict['support'])
            rule['confidence'] = float(ar_dict['confidence'])
            # Include utility measures if available.
            if 'realistic_rule_gain' in ar_dict:
                rule['max_rule_gain'] = float(ar_dict['max_rule_gain'])
                rule['realistic_rule_gain'] = int(ar_dict['realistic_rule_gain'])
                rule['realistic_dataset_gain'] = float(ar_dict['realistic_dataset_gain'])
            rules.append(rule)
        return json.dumps(rules)

    def get_pretty_ar_notation(self):
        """
        Generate a list of text strings representing the action rules.

        Returns
        -------
        list
            List of text strings representing the action rules.

        Notes
        -----
        This method constructs a list of text strings where each string represents an action rule in a
        readable format. The format includes conditions and transitions for each attribute, along with
        the target attribute change, support, confidence, and uplift values.
        """
        rules = []
        for ar_dict in self.action_rules:
            text = "If "
            for i, item in enumerate(ar_dict['undesired']['itemset']):
                if item == ar_dict['desired']['itemset'][i]:
                    if item in self.stable_cols:
                        val = self.column_values[item]
                        text += "attribute '" + val[0] + "' is '" + val[1] + "', "
                    else:
                        val = self.column_values[item]
                        text += "attribute (flexible is used as stable) '" + val[0] + "' is '" + val[1] + "', "
                else:
                    val = self.column_values[item]
                    val_desired = self.column_values[ar_dict['desired']['itemset'][i]]
                    text += "attribute '" + val[0] + "' value '" + val[1] + "' is changed to '" + val_desired[1] + "', "
            text += (
                "then '"
                + self.target
                + "' value '"
                + self.column_values[ar_dict['undesired']['target']][1]
                + "' is changed to '"
                + self.column_values[ar_dict['desired']['target']][1]
                + " with support: "
                + str(ar_dict['support'])
                + ", confidence: "
                + str(ar_dict['confidence'])
                + ", uplift: "
                + str(ar_dict['uplift'])
                + ", support of undesired part: "
                + str(ar_dict['undesired']['support'])
                + ", confidence of undesired part: "
                + str(ar_dict['undesired']['confidence'])
                + ", support of desired part: "
                + str(ar_dict['desired']['support'])
                + ", confidence of desired part: "
                + str(ar_dict['desired']['confidence'])
            )
            if 'realistic_rule_gain' in ar_dict:
                text += ", max_rule_gain: " + str(ar_dict['max_rule_gain'])
                text += ", realistic_rule_gain: " + str(ar_dict['realistic_rule_gain'])
                text += ", realistic_dataset_gain: " + str(ar_dict['realistic_dataset_gain'])
            text += "."
            rules.append(text)
        return rules

    def get_dominant_rules(self):
        """
        Identify and select the dominant (Pareto-optimal) action rules.

        This method compares action rules based on the union of their 'undesired'
        and 'desired' itemsets, as well as their 'uplift' values. It applies a
        Pareto-dominance approach:

        - If the new candidate rule is a superset of a current dominant rule
          with smaller or equal uplift, the candidate is dominated and not added.
        - If the new candidate rule is a subset of a current dominant rule
          with larger or equal uplift, the current dominant rule is dominated
          and removed.
        - Otherwise, the new candidate is added to the set of dominant rules.

        After processing all rules, the remaining dominant rules are sorted
        by 'uplift' in descending order, and the method returns their indices.

        Returns
        -------
        list
            A list of indices representing the dominant (Pareto-optimal)
            action rules, sorted by uplift in descending order.
        """
        dominant_rules = []

        # Initialize the first candidate rule
        first_rule = self.action_rules[0]
        first_rule['candidate_set'] = set(first_rule['undesired']['itemset']) | set(first_rule['desired']['itemset'])
        first_rule['rule_index'] = 0
        first_rule['to_delete'] = False
        dominant_rules.append(first_rule)

        # Iterate through remaining rules
        for idx, new_candidate in enumerate(self.action_rules[1:], start=1):
            new_candidate_set = set(new_candidate['undesired']['itemset']) | set(new_candidate['desired']['itemset'])
            is_add_rule = True
            # Compare the new dominant rule candidate with all current dominant rule candidates
            for dominant_rule in dominant_rules:
                # If the new candidate is superset of the dominant rule candidate and its uplift is smaller or the same,
                # the rule is not added to dominant rule candidates
                if (
                    dominant_rule['candidate_set'] < new_candidate_set
                    and dominant_rule['uplift'] >= new_candidate['uplift']
                ):
                    is_add_rule = False
                    break
                # If the new candidate is subset of the dominant rule candidate and its uplift is higher or the same,
                # the dominant rule candidate is removed from the dominant rule candidates
                elif (
                    dominant_rule['candidate_set'] > new_candidate_set
                    and dominant_rule['uplift'] <= new_candidate['uplift']
                ):
                    dominant_rule['to_delete'] = True
            # If the candidate rule did not find any rule that would be dominant to its, add the candidate to dominant
            # rule candidates
            if is_add_rule:
                new_candidate['to_delete'] = False
                new_candidate['candidate_set'] = new_candidate_set
                new_candidate['rule_index'] = idx
                dominant_rules.append(new_candidate)
            # Remove rules that are not anymore dominant
            dominant_rules = [rule for rule in dominant_rules if not rule['to_delete']]
        # Sort the action rules from the highest uplift
        sorted_indices = sorted(dominant_rules, key=lambda x: x["uplift"], reverse=True)
        important_rules_indices = [rule['rule_index'] for rule in sorted_indices]
        return important_rules_indices

__init__(action_rules, target, stable_items_binding, flexible_items_binding, column_values)

Initialize the Output class with the specified action rules and target attribute.

Parameters:

Name Type Description Default
action_rules list

List containing the action rules.

required
target str

The target attribute for the action rules.

required
stable_items_binding dict

Dictionary containing bindings for stable items.

required
flexible_items_binding dict

Dictionary containing bindings for flexible items.

required
column_values dict

Dictionary containing the values of the columns.

required
Notes

The constructor initializes the Output object by setting the provided action rules, target attribute, stable items, flexible items, and column values. It flattens the stable and flexible items bindings to create lists of indices for stable and flexible columns.

Source code in src/action_rules/output/output.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(
    self,
    action_rules: list,
    target: str,
    stable_items_binding: dict,
    flexible_items_binding: dict,
    column_values: dict,
):
    """
    Initialize the Output class with the specified action rules and target attribute.

    Parameters
    ----------
    action_rules : list
        List containing the action rules.
    target : str
        The target attribute for the action rules.
    stable_items_binding : dict
        Dictionary containing bindings for stable items.
    flexible_items_binding : dict
        Dictionary containing bindings for flexible items.
    column_values : dict
        Dictionary containing the values of the columns.

    Notes
    -----
    The constructor initializes the Output object by setting the provided action rules, target attribute,
    stable items, flexible items, and column values. It flattens the stable and flexible items bindings to
    create lists of indices for stable and flexible columns.
    """
    self.action_rules = action_rules
    self.target = target
    self.stable_cols = [item for sublist in stable_items_binding.values() for item in sublist]
    self.flexible_cols = [item for sublist in flexible_items_binding.values() for item in sublist]
    self.column_values = column_values

get_ar_notation()

Generate a string representation of the action rules in a human-readable format.

Returns:

Type Description
str

String representation of the action rules.

Notes

This method constructs a human-readable string representation of the action rules. Each rule is formatted to show the attribute-value conditions and transitions. The representation includes the support and confidence values for both the undesired and desired parts, as well as the uplift.

Source code in src/action_rules/output/output.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def get_ar_notation(self):
    """
    Generate a string representation of the action rules in a human-readable format.

    Returns
    -------
    str
        String representation of the action rules.

    Notes
    -----
    This method constructs a human-readable string representation of the action rules. Each rule is
    formatted to show the attribute-value conditions and transitions. The representation includes
    the support and confidence values for both the undesired and desired parts, as well as the uplift.
    """
    ar_notation = []
    for action_rule in self.action_rules:
        rule = '['
        for i, item in enumerate(action_rule['undesired']['itemset']):
            if i > 0:
                rule += ' ∧ '
            rule += '('
            if item == action_rule['desired']['itemset'][i]:
                if item in self.stable_cols:
                    val = self.column_values[item]
                    rule += str(val[0]) + ': ' + str(val[1])
                else:
                    val = self.column_values[item]
                    rule += str(val[0]) + '*: ' + str(val[1])
            else:
                val = self.column_values[item]
                val_desired = self.column_values[action_rule['desired']['itemset'][i]]
                rule += str(val[0]) + ': ' + str(val[1]) + ' → ' + str(val_desired[1])
            rule += ')'
        rule += (
            '] ⇒ ['
            + str(self.target)
            + ': '
            + str(self.column_values[action_rule['undesired']['target']][1])
            + ' → '
            + str(self.column_values[action_rule['desired']['target']][1])
            + ']'
        )
        rule += (
            ', support of undesired part: '
            + str(action_rule['undesired']['support'])
            + ', confidence of undesired part: '
            + str(action_rule['undesired']['confidence'])
        )
        rule += (
            ', support of desired part: '
            + str(action_rule['desired']['support'])
            + ', confidence of desired part: '
            + str(action_rule['desired']['confidence'])
        )
        rule += ', support: ' + str(action_rule['support']) + ', confidence: ' + str(action_rule['confidence'])
        rule += ', uplift: ' + str(action_rule['uplift'])
        # If utility measures exist, include them in the output.
        if 'realistic_rule_gain' in action_rule:
            rule += ", max_rule_gain: " + str(action_rule['max_rule_gain'])
            rule += ", realistic_rule_gain: " + str(action_rule['realistic_rule_gain'])
            rule += ", realistic_dataset_gain: " + str(action_rule['realistic_dataset_gain'])
        ar_notation.append(rule)
    return ar_notation

get_dominant_rules()

Identify and select the dominant (Pareto-optimal) action rules.

This method compares action rules based on the union of their 'undesired' and 'desired' itemsets, as well as their 'uplift' values. It applies a Pareto-dominance approach:

  • If the new candidate rule is a superset of a current dominant rule with smaller or equal uplift, the candidate is dominated and not added.
  • If the new candidate rule is a subset of a current dominant rule with larger or equal uplift, the current dominant rule is dominated and removed.
  • Otherwise, the new candidate is added to the set of dominant rules.

After processing all rules, the remaining dominant rules are sorted by 'uplift' in descending order, and the method returns their indices.

Returns:

Type Description
list

A list of indices representing the dominant (Pareto-optimal) action rules, sorted by uplift in descending order.

Source code in src/action_rules/output/output.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def get_dominant_rules(self):
    """
    Identify and select the dominant (Pareto-optimal) action rules.

    This method compares action rules based on the union of their 'undesired'
    and 'desired' itemsets, as well as their 'uplift' values. It applies a
    Pareto-dominance approach:

    - If the new candidate rule is a superset of a current dominant rule
      with smaller or equal uplift, the candidate is dominated and not added.
    - If the new candidate rule is a subset of a current dominant rule
      with larger or equal uplift, the current dominant rule is dominated
      and removed.
    - Otherwise, the new candidate is added to the set of dominant rules.

    After processing all rules, the remaining dominant rules are sorted
    by 'uplift' in descending order, and the method returns their indices.

    Returns
    -------
    list
        A list of indices representing the dominant (Pareto-optimal)
        action rules, sorted by uplift in descending order.
    """
    dominant_rules = []

    # Initialize the first candidate rule
    first_rule = self.action_rules[0]
    first_rule['candidate_set'] = set(first_rule['undesired']['itemset']) | set(first_rule['desired']['itemset'])
    first_rule['rule_index'] = 0
    first_rule['to_delete'] = False
    dominant_rules.append(first_rule)

    # Iterate through remaining rules
    for idx, new_candidate in enumerate(self.action_rules[1:], start=1):
        new_candidate_set = set(new_candidate['undesired']['itemset']) | set(new_candidate['desired']['itemset'])
        is_add_rule = True
        # Compare the new dominant rule candidate with all current dominant rule candidates
        for dominant_rule in dominant_rules:
            # If the new candidate is superset of the dominant rule candidate and its uplift is smaller or the same,
            # the rule is not added to dominant rule candidates
            if (
                dominant_rule['candidate_set'] < new_candidate_set
                and dominant_rule['uplift'] >= new_candidate['uplift']
            ):
                is_add_rule = False
                break
            # If the new candidate is subset of the dominant rule candidate and its uplift is higher or the same,
            # the dominant rule candidate is removed from the dominant rule candidates
            elif (
                dominant_rule['candidate_set'] > new_candidate_set
                and dominant_rule['uplift'] <= new_candidate['uplift']
            ):
                dominant_rule['to_delete'] = True
        # If the candidate rule did not find any rule that would be dominant to its, add the candidate to dominant
        # rule candidates
        if is_add_rule:
            new_candidate['to_delete'] = False
            new_candidate['candidate_set'] = new_candidate_set
            new_candidate['rule_index'] = idx
            dominant_rules.append(new_candidate)
        # Remove rules that are not anymore dominant
        dominant_rules = [rule for rule in dominant_rules if not rule['to_delete']]
    # Sort the action rules from the highest uplift
    sorted_indices = sorted(dominant_rules, key=lambda x: x["uplift"], reverse=True)
    important_rules_indices = [rule['rule_index'] for rule in sorted_indices]
    return important_rules_indices

get_export_notation()

Generate a JSON string of dictionaries representing the action rules for export.

Returns:

Type Description
str

JSON string of dictionaries representing the action rules.

Notes

This method constructs a list of dictionaries where each dictionary represents an action rule. The dictionaries include attributes for stable and flexible items, as well as the target attribute, support, confidence, and uplift values. The list is then converted to a JSON string for export.

Source code in src/action_rules/output/output.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def get_export_notation(self):
    """
    Generate a JSON string of dictionaries representing the action rules for export.

    Returns
    -------
    str
        JSON string of dictionaries representing the action rules.

    Notes
    -----
    This method constructs a list of dictionaries where each dictionary represents an action rule.
    The dictionaries include attributes for stable and flexible items, as well as the target attribute,
    support, confidence, and uplift values. The list is then converted to a JSON string for export.
    """
    rules = []
    for ar_dict in self.action_rules:
        rule = {'stable': [], 'flexible': []}
        for i, item in enumerate(ar_dict['undesired']['itemset']):
            if item == ar_dict['desired']['itemset'][i]:
                if item in self.stable_cols:
                    val = self.column_values[item]
                    rule['stable'].append({'attribute': val[0], 'value': val[1]})
                else:
                    val = self.column_values[item]
                    rule['stable'].append({'attribute': val[0], 'value': val[1], 'flexible_as_stable': True})
            else:
                val = self.column_values[item]
                val_desired = self.column_values[ar_dict['desired']['itemset'][i]]
                rule['flexible'].append({'attribute': val[0], 'undesired': val[1], 'desired': val_desired[1]})
        rule['target'] = {
            'attribute': self.target,
            'undesired': str(self.column_values[ar_dict['undesired']['target']][1]),
            'desired': str(self.column_values[ar_dict['desired']['target']][1]),
        }
        rule['support of undesired part'] = int(ar_dict['undesired']['support'])
        rule['confidence of undesired part'] = float(ar_dict['undesired']['confidence'])
        rule['support of desired part'] = int(ar_dict['desired']['support'])
        rule['confidence of desired part'] = float(ar_dict['desired']['confidence'])
        rule['uplift'] = float(ar_dict['uplift'])
        rule['support'] = int(ar_dict['support'])
        rule['confidence'] = float(ar_dict['confidence'])
        # Include utility measures if available.
        if 'realistic_rule_gain' in ar_dict:
            rule['max_rule_gain'] = float(ar_dict['max_rule_gain'])
            rule['realistic_rule_gain'] = int(ar_dict['realistic_rule_gain'])
            rule['realistic_dataset_gain'] = float(ar_dict['realistic_dataset_gain'])
        rules.append(rule)
    return json.dumps(rules)

get_pretty_ar_notation()

Generate a list of text strings representing the action rules.

Returns:

Type Description
list

List of text strings representing the action rules.

Notes

This method constructs a list of text strings where each string represents an action rule in a readable format. The format includes conditions and transitions for each attribute, along with the target attribute change, support, confidence, and uplift values.

Source code in src/action_rules/output/output.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def get_pretty_ar_notation(self):
    """
    Generate a list of text strings representing the action rules.

    Returns
    -------
    list
        List of text strings representing the action rules.

    Notes
    -----
    This method constructs a list of text strings where each string represents an action rule in a
    readable format. The format includes conditions and transitions for each attribute, along with
    the target attribute change, support, confidence, and uplift values.
    """
    rules = []
    for ar_dict in self.action_rules:
        text = "If "
        for i, item in enumerate(ar_dict['undesired']['itemset']):
            if item == ar_dict['desired']['itemset'][i]:
                if item in self.stable_cols:
                    val = self.column_values[item]
                    text += "attribute '" + val[0] + "' is '" + val[1] + "', "
                else:
                    val = self.column_values[item]
                    text += "attribute (flexible is used as stable) '" + val[0] + "' is '" + val[1] + "', "
            else:
                val = self.column_values[item]
                val_desired = self.column_values[ar_dict['desired']['itemset'][i]]
                text += "attribute '" + val[0] + "' value '" + val[1] + "' is changed to '" + val_desired[1] + "', "
        text += (
            "then '"
            + self.target
            + "' value '"
            + self.column_values[ar_dict['undesired']['target']][1]
            + "' is changed to '"
            + self.column_values[ar_dict['desired']['target']][1]
            + " with support: "
            + str(ar_dict['support'])
            + ", confidence: "
            + str(ar_dict['confidence'])
            + ", uplift: "
            + str(ar_dict['uplift'])
            + ", support of undesired part: "
            + str(ar_dict['undesired']['support'])
            + ", confidence of undesired part: "
            + str(ar_dict['undesired']['confidence'])
            + ", support of desired part: "
            + str(ar_dict['desired']['support'])
            + ", confidence of desired part: "
            + str(ar_dict['desired']['confidence'])
        )
        if 'realistic_rule_gain' in ar_dict:
            text += ", max_rule_gain: " + str(ar_dict['max_rule_gain'])
            text += ", realistic_rule_gain: " + str(ar_dict['realistic_rule_gain'])
            text += ", realistic_dataset_gain: " + str(ar_dict['realistic_dataset_gain'])
        text += "."
        rules.append(text)
    return rules

Rules

A class used to manage and generate classification and action rules.

Attributes:

Name Type Description
classification_rules defaultdict

Default dictionary to store classification rules for undesired and desired states.

undesired_state str

The undesired state of the target attribute.

desired_state str

The desired state of the target attribute.

columns list

List of columns in the dataset.

action_rules list

List to store generated action rules.

undesired_prefixes_without_conf set

Set to store prefixes of undesired states without conflicts.

desired_prefixes_without_conf set

Set to store prefixes of desired states without conflicts.

count_transactions int

The number of transactions in the data.

intrinsic_utility_table (dict, optional)

(attribute, value) -> float A lookup table for the intrinsic utility of each attribute-value pair. If None, no intrinsic utility is considered.

transition_utility_table (dict, optional)

(attribute, from_value, to_value) -> float A lookup table for cost/gain of transitions between values. If None, no transition utility is considered.

Methods:

Name Description
add_prefix_without_conf

Add a prefix to the set of prefixes without conflicts.

add_classification_rules

Add classification rules for undesired and desired states.

generate_action_rules

Generate action rules from classification rules.

prune_classification_rules

Prune classification rules based on their length and update the stop list.

calculate_confidence

Calculate the confidence of a rule.

calculate_uplift

Calculate the uplift of an action rule.

Source code in src/action_rules/rules/rules.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
class Rules:
    """
    A class used to manage and generate classification and action rules.

    Attributes
    ----------
    classification_rules : defaultdict
        Default dictionary to store classification rules for undesired and desired states.
    undesired_state : str
        The undesired state of the target attribute.
    desired_state : str
        The desired state of the target attribute.
    columns : list
        List of columns in the dataset.
    action_rules : list
        List to store generated action rules.
    undesired_prefixes_without_conf : set
        Set to store prefixes of undesired states without conflicts.
    desired_prefixes_without_conf : set
        Set to store prefixes of desired states without conflicts.
    count_transactions : int
        The number of transactions in the data.
    intrinsic_utility_table : dict, optional
        (attribute, value) -> float
        A lookup table for the intrinsic utility of each attribute-value pair.
        If None, no intrinsic utility is considered.
    transition_utility_table : dict, optional
        (attribute, from_value, to_value) -> float
        A lookup table for cost/gain of transitions between values.
        If None, no transition utility is considered.

    Methods
    -------
    add_prefix_without_conf(prefix, is_desired)
        Add a prefix to the set of prefixes without conflicts.
    add_classification_rules(new_ar_prefix, itemset_prefix, undesired_states, desired_states)
        Add classification rules for undesired and desired states.
    generate_action_rules()
        Generate action rules from classification rules.
    prune_classification_rules(k, stop_list)
        Prune classification rules based on their length and update the stop list.
    calculate_confidence(support, opposite_support)
        Calculate the confidence of a rule.
    calculate_uplift(undesired_support, undesired_confidence, desired_confidence)
        Calculate the uplift of an action rule.
    """

    def __init__(
        self,
        undesired_state: str,
        desired_state: str,
        columns: list,
        count_transactions: int,
        intrinsic_utility_table: Optional[dict] = None,
        transition_utility_table: Optional[dict] = None,
    ):
        """
        Initialize the Rules class with the specified undesired and desired states, columns, and transaction count.

        Parameters
        ----------
        undesired_state : str
            The undesired state of the target attribute.
        desired_state : str
            The desired state of the target attribute.
        columns : list
            List of columns in the dataset.
        count_transactions : int
            The number of transactions in the data.
        intrinsic_utility_table : dict, optional
            (attribute, value) -> float
            A lookup table for the intrinsic utility of each attribute-value pair.
            If None, no intrinsic utility is considered.
        transition_utility_table : dict, optional
            (attribute, from_value, to_value) -> float
            A lookup table for cost/gain of transitions between values.
            If None, no transition utility is considered.

        Notes
        -----
        The classification_rules attribute is initialized as a defaultdict with a lambda function that creates
        dictionaries for 'desired' and 'undesired' states.
        """
        self.classification_rules = defaultdict(lambda: {'desired': [], 'undesired': []})  # type: defaultdict
        self.undesired_state = undesired_state
        self.columns = columns
        self.desired_state = desired_state
        self.action_rules = []  # type: list
        self.undesired_prefixes_without_conf = set()  # type: set
        self.desired_prefixes_without_conf = set()  # type: set
        self.count_transactions = count_transactions
        self.intrinsic_utility_table = intrinsic_utility_table or {}
        self.transition_utility_table = transition_utility_table or {}

    def add_prefix_without_conf(self, prefix: tuple, is_desired: bool):
        """
        Add a prefix to the set of prefixes without conflicts.

        Parameters
        ----------
        prefix : tuple
            The prefix to be added.
        is_desired : bool
            If True, add the prefix to the desired prefixes set; otherwise, add it to the undesired prefixes set.

        Notes
        -----
        This method is useful for keeping track of prefixes that have no conflicting rules and can be
        used directly in rule generation.
        """
        if is_desired:
            self.desired_prefixes_without_conf.add(prefix)
        else:
            self.undesired_prefixes_without_conf.add(prefix)

    def add_classification_rules(self, new_ar_prefix, itemset_prefix, undesired_states, desired_states):
        """
        Add classification rules for undesired and desired states.

        Parameters
        ----------
        new_ar_prefix : tuple
            Prefix of the action rule.
        itemset_prefix : tuple
            Prefix of the itemset.
        undesired_states : list
            List of dictionaries containing undesired state information.
        desired_states : list
            List of dictionaries containing desired state information.

        Notes
        -----
        This method updates the classification_rules attribute with new rules based on the provided
        undesired and desired states. Each state is represented as a dictionary containing item, support,
        confidence, and target information.
        """
        for undesired_item in undesired_states:
            new_itemset_prefix = itemset_prefix + (undesired_item['item'],)
            self.classification_rules[new_ar_prefix]['undesired'].append(
                {
                    'itemset': new_itemset_prefix,
                    'support': undesired_item['support'],
                    'confidence': undesired_item['confidence'],
                    'target': self.undesired_state,
                }
            )
        for desired_item in desired_states:
            new_itemset_prefix = itemset_prefix + (desired_item['item'],)
            self.classification_rules[new_ar_prefix]['desired'].append(
                {
                    'itemset': new_itemset_prefix,
                    'support': desired_item['support'],
                    'confidence': desired_item['confidence'],
                    'target': self.desired_state,
                }
            )

    def generate_action_rules(self):
        """
        Generate action rules from classification rules.

        Notes
        -----
        This method creates action rules by combining classification rules for undesired and desired states.
        The uplift for each action rule is calculated using the `calculate_uplift` method and the result is
        stored in the action_rules attribute.
        """
        for attribute_prefix, rules in self.classification_rules.items():
            for desired_rule in rules['desired']:
                for undesired_rule in rules['undesired']:
                    # Uplift
                    uplift = self.calculate_uplift(
                        undesired_rule['support'],
                        undesired_rule['confidence'],
                        desired_rule['confidence'],
                    )
                    # Utility
                    utility = None
                    if self.intrinsic_utility_table is not None or self.transition_utility_table is not None:
                        (max_rule_gain, realistic_rule_gain, realistic_rule_gain_dataset) = self.compute_rule_utility(
                            undesired_rule, desired_rule
                        )
                        utility = {
                            'max_rule_gain': max_rule_gain,
                            'realistic_rule_gain': realistic_rule_gain,
                            'realistic_dataset_gain': realistic_rule_gain_dataset,
                        }
                    # Action rule measures
                    ar_support, ar_confidence = self.compute_action_rule_measures(
                        undesired_rule.get('support', 0.0),
                        undesired_rule.get('confidence', 0.0),
                        desired_rule.get('support', 0.0),
                        desired_rule.get('confidence', 0.0),
                    )
                    self.action_rules.append(
                        {
                            'undesired': undesired_rule,
                            'desired': desired_rule,
                            'uplift': uplift,
                            'support': ar_support,
                            'confidence': ar_confidence,
                            **utility,
                        }
                    )

    def prune_classification_rules(self, k: int, stop_list: list):
        """
        Prune classification rules based on their length and update the stop list.

        Parameters
        ----------
        k : int
            Length of the attribute prefix.
        stop_list : list
            List of prefixes to stop generating rules for.

        Notes
        -----
        This method removes classification rules whose prefix length equals k and either desired or undesired
        states are empty. The corresponding prefixes are also added to the stop_list to avoid further rule generation.
        """
        del_prefixes = []
        for attribute_prefix, rules in self.classification_rules.items():
            if k == len(attribute_prefix):
                len_desired = len(rules['desired'])
                len_undesired = len(rules['undesired'])
                if len_desired == 0 or len_undesired == 0:
                    if (len_desired == 0 and attribute_prefix not in self.desired_prefixes_without_conf) or (
                        len_undesired == 0 and attribute_prefix not in self.undesired_prefixes_without_conf
                    ):
                        stop_list.append(attribute_prefix)
                    del_prefixes.append(attribute_prefix)
        for attribute_prefix in del_prefixes:
            del self.classification_rules[attribute_prefix]

    def calculate_confidence(self, support, opposite_support):
        """
        Calculate the confidence of a rule.

        Parameters
        ----------
        support : int
            The support value for the desired or undesired state.
        opposite_support : int
            The support value for the opposite state.

        Returns
        -------
        float
            The confidence value calculated as support / (support + opposite_support).
            Returns 0 if the sum of support and opposite_support is 0.

        Notes
        -----
        Confidence is a measure of the reliability of a rule. A higher confidence indicates a stronger
        association between the conditions of the rule and the target state.
        """
        if support + opposite_support == 0:
            return 0
        return support / (support + opposite_support)

    def calculate_uplift(self, undesired_support: int, undesired_confidence: float, desired_confidence: float) -> float:
        """
        Calculate the uplift of an action rule.

        Parameters
        ----------
        undesired_support : int
            The support value for the undesired state.
        undesired_confidence : float
            The confidence value for the undesired state.
        desired_confidence : float
            The confidence value for the desired state.

        Returns
        -------
        float
            The uplift value calculated as:
            ((desired_confidence - (1 - undesired_confidence)) * (undesired_support / undesired_confidence))
            / self.count_transactions.

        Notes
        -----
        Uplift measures the increase in the probability of achieving the desired state when applying the action rule
        compared to not applying it. It is used to assess the effectiveness of the rule.
        """
        return (
            (desired_confidence - (1 - undesired_confidence)) * (undesired_support / undesired_confidence)
        ) / self.count_transactions

    def compute_rule_utility(self, undesired_rule: dict, desired_rule: dict) -> tuple:
        """
        Compute various utility gains for a rule transition from undesired to desired.

        The function computes intrinsic utilities for items in both the undesired and desired rule itemsets,
        calculates a transition gain for changes in flexible attributes, and adjusts these gains using target
        state utilities and rule confidences to derive realistic gain metrics at both the rule and dataset levels.

        Parameters
        ----------
        undesired_rule : dict
            Dictionary representing the undesired rule. Expected keys:
                - 'itemset': list of item indices in the undesired rule.
                - 'confidence': (optional) confidence level of the undesired rule.
                - 'support': (optional) support count of the undesired rule.
        desired_rule : dict
            Dictionary representing the desired rule. Expected keys:
                - 'itemset': list of item indices in the desired rule.
                - 'confidence': (optional) confidence level of the desired rule.

        Returns
        -------
        tuple of (float, float, float)
            - max_rule_gain : float
                The maximum rule gain computed as the sum of rule gain and target gain.
            - realistic_rule_gain : float
                The rule gain adjusted with a realistic target gain, incorporating rule confidences.
            - realistic_rule_gain_dataset : float
                The dataset-level realistic rule gain, computed by scaling realistic rule gain with the estimated
                number of transactions.
        """
        u_undesired = 0.0
        # Sum intrinsic utilities for each item index in the undesired rule's itemset.
        for idx in undesired_rule.get('itemset', []):
            intrinsic_value = self.intrinsic_utility_table.get(idx, 0.0)
            u_undesired += intrinsic_value

        # Initialize the desired rule utility.
        u_desired = 0.0
        # Sum intrinsic utilities for each item index in the desired rule's itemset.
        for idx in desired_rule.get('itemset', []):
            intrinsic_value = self.intrinsic_utility_table.get(idx, 0.0)
            u_desired += intrinsic_value

        # Initialize additional transition gain.
        transition_gain = 0.0
        # Iterate over corresponding item indices from undesired and desired itemsets.
        for u_idx, d_idx in zip(undesired_rule.get('itemset', []), desired_rule.get('itemset', [])):
            # Only add transition gain if the indices differ (indicating a change in a flexible attribute).
            if u_idx != d_idx:
                trans_value = self.transition_utility_table.get((u_idx, d_idx), 0.0)
                transition_gain += trans_value

        rule_gain = u_desired - u_undesired + transition_gain

        # Target utility
        u_undesired_target = self.intrinsic_utility_table.get(self.undesired_state, 0.0)
        u_desired_target = self.intrinsic_utility_table.get(self.desired_state, 0.0)
        transition_gain_target = self.transition_utility_table.get((self.undesired_state, self.desired_state), 0.0)

        target_gain = u_desired_target - u_undesired_target + transition_gain_target

        # Realistic target gain
        undesired_rule_confidence = undesired_rule.get('confidence', 0.0)
        desired_rule_confidence = desired_rule.get('confidence', 0.0)
        target_gain_realistic = (desired_rule_confidence - (1 - undesired_rule_confidence)) * target_gain

        # Rule gain
        max_rule_gain = rule_gain + target_gain
        realistic_rule_gain = rule_gain + target_gain_realistic

        # Compute dataset-level realistic gain.
        support = undesired_rule.get('support', 0)
        if undesired_rule_confidence > 0:
            transactions = support / undesired_rule_confidence
        else:
            transactions = 0.0
        realistic_rule_gain_dataset = transactions * realistic_rule_gain

        return max_rule_gain, realistic_rule_gain, realistic_rule_gain_dataset

    def compute_action_rule_measures(
        self, support_undesired, confidence_undesired, support_desired, confidence_desired
    ):
        """
        Compute the support and confidence for an action rule formed from an undesired rule and a desired rule.

        The action rule is derived by pairing a classification rule that leads to an undesired outcome
        with a classification rule that leads to a desired outcome. In this formulation, the support
        of the action rule is defined as the minimum of the supports of the two component rules, and
        the confidence of the action rule is defined as the product of their confidences.

        Parameters
        ----------
        support_undesired : float
            The support of the undesired rule (e.g., count or relative frequency).
        confidence_undesired : float
            The confidence of the undesired rule (a value between 0 and 1).
        support_desired : float
            The support of the desired rule.
        confidence_desired : float
            The confidence of the desired rule (a value between 0 and 1).

        Returns
        -------
        tuple of (float, float)
            A tuple containing:
                - action_support : float
                    The support of the action rule, computed as min(support_undesired, support_desired).
                - action_confidence : float
                    The confidence of the action rule, computed as confidence_undesired * confidence_desired.

        """
        action_support = min(support_undesired, support_desired)
        action_confidence = confidence_undesired * confidence_desired
        return action_support, action_confidence

__init__(undesired_state, desired_state, columns, count_transactions, intrinsic_utility_table=None, transition_utility_table=None)

Initialize the Rules class with the specified undesired and desired states, columns, and transaction count.

Parameters:

Name Type Description Default
undesired_state str

The undesired state of the target attribute.

required
desired_state str

The desired state of the target attribute.

required
columns list

List of columns in the dataset.

required
count_transactions int

The number of transactions in the data.

required
intrinsic_utility_table dict

(attribute, value) -> float A lookup table for the intrinsic utility of each attribute-value pair. If None, no intrinsic utility is considered.

None
transition_utility_table dict

(attribute, from_value, to_value) -> float A lookup table for cost/gain of transitions between values. If None, no transition utility is considered.

None
Notes

The classification_rules attribute is initialized as a defaultdict with a lambda function that creates dictionaries for 'desired' and 'undesired' states.

Source code in src/action_rules/rules/rules.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(
    self,
    undesired_state: str,
    desired_state: str,
    columns: list,
    count_transactions: int,
    intrinsic_utility_table: Optional[dict] = None,
    transition_utility_table: Optional[dict] = None,
):
    """
    Initialize the Rules class with the specified undesired and desired states, columns, and transaction count.

    Parameters
    ----------
    undesired_state : str
        The undesired state of the target attribute.
    desired_state : str
        The desired state of the target attribute.
    columns : list
        List of columns in the dataset.
    count_transactions : int
        The number of transactions in the data.
    intrinsic_utility_table : dict, optional
        (attribute, value) -> float
        A lookup table for the intrinsic utility of each attribute-value pair.
        If None, no intrinsic utility is considered.
    transition_utility_table : dict, optional
        (attribute, from_value, to_value) -> float
        A lookup table for cost/gain of transitions between values.
        If None, no transition utility is considered.

    Notes
    -----
    The classification_rules attribute is initialized as a defaultdict with a lambda function that creates
    dictionaries for 'desired' and 'undesired' states.
    """
    self.classification_rules = defaultdict(lambda: {'desired': [], 'undesired': []})  # type: defaultdict
    self.undesired_state = undesired_state
    self.columns = columns
    self.desired_state = desired_state
    self.action_rules = []  # type: list
    self.undesired_prefixes_without_conf = set()  # type: set
    self.desired_prefixes_without_conf = set()  # type: set
    self.count_transactions = count_transactions
    self.intrinsic_utility_table = intrinsic_utility_table or {}
    self.transition_utility_table = transition_utility_table or {}

add_classification_rules(new_ar_prefix, itemset_prefix, undesired_states, desired_states)

Add classification rules for undesired and desired states.

Parameters:

Name Type Description Default
new_ar_prefix tuple

Prefix of the action rule.

required
itemset_prefix tuple

Prefix of the itemset.

required
undesired_states list

List of dictionaries containing undesired state information.

required
desired_states list

List of dictionaries containing desired state information.

required
Notes

This method updates the classification_rules attribute with new rules based on the provided undesired and desired states. Each state is represented as a dictionary containing item, support, confidence, and target information.

Source code in src/action_rules/rules/rules.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def add_classification_rules(self, new_ar_prefix, itemset_prefix, undesired_states, desired_states):
    """
    Add classification rules for undesired and desired states.

    Parameters
    ----------
    new_ar_prefix : tuple
        Prefix of the action rule.
    itemset_prefix : tuple
        Prefix of the itemset.
    undesired_states : list
        List of dictionaries containing undesired state information.
    desired_states : list
        List of dictionaries containing desired state information.

    Notes
    -----
    This method updates the classification_rules attribute with new rules based on the provided
    undesired and desired states. Each state is represented as a dictionary containing item, support,
    confidence, and target information.
    """
    for undesired_item in undesired_states:
        new_itemset_prefix = itemset_prefix + (undesired_item['item'],)
        self.classification_rules[new_ar_prefix]['undesired'].append(
            {
                'itemset': new_itemset_prefix,
                'support': undesired_item['support'],
                'confidence': undesired_item['confidence'],
                'target': self.undesired_state,
            }
        )
    for desired_item in desired_states:
        new_itemset_prefix = itemset_prefix + (desired_item['item'],)
        self.classification_rules[new_ar_prefix]['desired'].append(
            {
                'itemset': new_itemset_prefix,
                'support': desired_item['support'],
                'confidence': desired_item['confidence'],
                'target': self.desired_state,
            }
        )

add_prefix_without_conf(prefix, is_desired)

Add a prefix to the set of prefixes without conflicts.

Parameters:

Name Type Description Default
prefix tuple

The prefix to be added.

required
is_desired bool

If True, add the prefix to the desired prefixes set; otherwise, add it to the undesired prefixes set.

required
Notes

This method is useful for keeping track of prefixes that have no conflicting rules and can be used directly in rule generation.

Source code in src/action_rules/rules/rules.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def add_prefix_without_conf(self, prefix: tuple, is_desired: bool):
    """
    Add a prefix to the set of prefixes without conflicts.

    Parameters
    ----------
    prefix : tuple
        The prefix to be added.
    is_desired : bool
        If True, add the prefix to the desired prefixes set; otherwise, add it to the undesired prefixes set.

    Notes
    -----
    This method is useful for keeping track of prefixes that have no conflicting rules and can be
    used directly in rule generation.
    """
    if is_desired:
        self.desired_prefixes_without_conf.add(prefix)
    else:
        self.undesired_prefixes_without_conf.add(prefix)

calculate_confidence(support, opposite_support)

Calculate the confidence of a rule.

Parameters:

Name Type Description Default
support int

The support value for the desired or undesired state.

required
opposite_support int

The support value for the opposite state.

required

Returns:

Type Description
float

The confidence value calculated as support / (support + opposite_support). Returns 0 if the sum of support and opposite_support is 0.

Notes

Confidence is a measure of the reliability of a rule. A higher confidence indicates a stronger association between the conditions of the rule and the target state.

Source code in src/action_rules/rules/rules.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def calculate_confidence(self, support, opposite_support):
    """
    Calculate the confidence of a rule.

    Parameters
    ----------
    support : int
        The support value for the desired or undesired state.
    opposite_support : int
        The support value for the opposite state.

    Returns
    -------
    float
        The confidence value calculated as support / (support + opposite_support).
        Returns 0 if the sum of support and opposite_support is 0.

    Notes
    -----
    Confidence is a measure of the reliability of a rule. A higher confidence indicates a stronger
    association between the conditions of the rule and the target state.
    """
    if support + opposite_support == 0:
        return 0
    return support / (support + opposite_support)

calculate_uplift(undesired_support, undesired_confidence, desired_confidence)

Calculate the uplift of an action rule.

Parameters:

Name Type Description Default
undesired_support int

The support value for the undesired state.

required
undesired_confidence float

The confidence value for the undesired state.

required
desired_confidence float

The confidence value for the desired state.

required

Returns:

Type Description
float

The uplift value calculated as: ((desired_confidence - (1 - undesired_confidence)) * (undesired_support / undesired_confidence)) / self.count_transactions.

Notes

Uplift measures the increase in the probability of achieving the desired state when applying the action rule compared to not applying it. It is used to assess the effectiveness of the rule.

Source code in src/action_rules/rules/rules.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def calculate_uplift(self, undesired_support: int, undesired_confidence: float, desired_confidence: float) -> float:
    """
    Calculate the uplift of an action rule.

    Parameters
    ----------
    undesired_support : int
        The support value for the undesired state.
    undesired_confidence : float
        The confidence value for the undesired state.
    desired_confidence : float
        The confidence value for the desired state.

    Returns
    -------
    float
        The uplift value calculated as:
        ((desired_confidence - (1 - undesired_confidence)) * (undesired_support / undesired_confidence))
        / self.count_transactions.

    Notes
    -----
    Uplift measures the increase in the probability of achieving the desired state when applying the action rule
    compared to not applying it. It is used to assess the effectiveness of the rule.
    """
    return (
        (desired_confidence - (1 - undesired_confidence)) * (undesired_support / undesired_confidence)
    ) / self.count_transactions

compute_action_rule_measures(support_undesired, confidence_undesired, support_desired, confidence_desired)

Compute the support and confidence for an action rule formed from an undesired rule and a desired rule.

The action rule is derived by pairing a classification rule that leads to an undesired outcome with a classification rule that leads to a desired outcome. In this formulation, the support of the action rule is defined as the minimum of the supports of the two component rules, and the confidence of the action rule is defined as the product of their confidences.

Parameters:

Name Type Description Default
support_undesired float

The support of the undesired rule (e.g., count or relative frequency).

required
confidence_undesired float

The confidence of the undesired rule (a value between 0 and 1).

required
support_desired float

The support of the desired rule.

required
confidence_desired float

The confidence of the desired rule (a value between 0 and 1).

required

Returns:

Type Description
tuple of (float, float)

A tuple containing: - action_support : float The support of the action rule, computed as min(support_undesired, support_desired). - action_confidence : float The confidence of the action rule, computed as confidence_undesired * confidence_desired.

Source code in src/action_rules/rules/rules.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def compute_action_rule_measures(
    self, support_undesired, confidence_undesired, support_desired, confidence_desired
):
    """
    Compute the support and confidence for an action rule formed from an undesired rule and a desired rule.

    The action rule is derived by pairing a classification rule that leads to an undesired outcome
    with a classification rule that leads to a desired outcome. In this formulation, the support
    of the action rule is defined as the minimum of the supports of the two component rules, and
    the confidence of the action rule is defined as the product of their confidences.

    Parameters
    ----------
    support_undesired : float
        The support of the undesired rule (e.g., count or relative frequency).
    confidence_undesired : float
        The confidence of the undesired rule (a value between 0 and 1).
    support_desired : float
        The support of the desired rule.
    confidence_desired : float
        The confidence of the desired rule (a value between 0 and 1).

    Returns
    -------
    tuple of (float, float)
        A tuple containing:
            - action_support : float
                The support of the action rule, computed as min(support_undesired, support_desired).
            - action_confidence : float
                The confidence of the action rule, computed as confidence_undesired * confidence_desired.

    """
    action_support = min(support_undesired, support_desired)
    action_confidence = confidence_undesired * confidence_desired
    return action_support, action_confidence

compute_rule_utility(undesired_rule, desired_rule)

Compute various utility gains for a rule transition from undesired to desired.

The function computes intrinsic utilities for items in both the undesired and desired rule itemsets, calculates a transition gain for changes in flexible attributes, and adjusts these gains using target state utilities and rule confidences to derive realistic gain metrics at both the rule and dataset levels.

Parameters:

Name Type Description Default
undesired_rule dict

Dictionary representing the undesired rule. Expected keys: - 'itemset': list of item indices in the undesired rule. - 'confidence': (optional) confidence level of the undesired rule. - 'support': (optional) support count of the undesired rule.

required
desired_rule dict

Dictionary representing the desired rule. Expected keys: - 'itemset': list of item indices in the desired rule. - 'confidence': (optional) confidence level of the desired rule.

required

Returns:

Type Description
tuple of (float, float, float)
  • max_rule_gain : float The maximum rule gain computed as the sum of rule gain and target gain.
  • realistic_rule_gain : float The rule gain adjusted with a realistic target gain, incorporating rule confidences.
  • realistic_rule_gain_dataset : float The dataset-level realistic rule gain, computed by scaling realistic rule gain with the estimated number of transactions.
Source code in src/action_rules/rules/rules.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def compute_rule_utility(self, undesired_rule: dict, desired_rule: dict) -> tuple:
    """
    Compute various utility gains for a rule transition from undesired to desired.

    The function computes intrinsic utilities for items in both the undesired and desired rule itemsets,
    calculates a transition gain for changes in flexible attributes, and adjusts these gains using target
    state utilities and rule confidences to derive realistic gain metrics at both the rule and dataset levels.

    Parameters
    ----------
    undesired_rule : dict
        Dictionary representing the undesired rule. Expected keys:
            - 'itemset': list of item indices in the undesired rule.
            - 'confidence': (optional) confidence level of the undesired rule.
            - 'support': (optional) support count of the undesired rule.
    desired_rule : dict
        Dictionary representing the desired rule. Expected keys:
            - 'itemset': list of item indices in the desired rule.
            - 'confidence': (optional) confidence level of the desired rule.

    Returns
    -------
    tuple of (float, float, float)
        - max_rule_gain : float
            The maximum rule gain computed as the sum of rule gain and target gain.
        - realistic_rule_gain : float
            The rule gain adjusted with a realistic target gain, incorporating rule confidences.
        - realistic_rule_gain_dataset : float
            The dataset-level realistic rule gain, computed by scaling realistic rule gain with the estimated
            number of transactions.
    """
    u_undesired = 0.0
    # Sum intrinsic utilities for each item index in the undesired rule's itemset.
    for idx in undesired_rule.get('itemset', []):
        intrinsic_value = self.intrinsic_utility_table.get(idx, 0.0)
        u_undesired += intrinsic_value

    # Initialize the desired rule utility.
    u_desired = 0.0
    # Sum intrinsic utilities for each item index in the desired rule's itemset.
    for idx in desired_rule.get('itemset', []):
        intrinsic_value = self.intrinsic_utility_table.get(idx, 0.0)
        u_desired += intrinsic_value

    # Initialize additional transition gain.
    transition_gain = 0.0
    # Iterate over corresponding item indices from undesired and desired itemsets.
    for u_idx, d_idx in zip(undesired_rule.get('itemset', []), desired_rule.get('itemset', [])):
        # Only add transition gain if the indices differ (indicating a change in a flexible attribute).
        if u_idx != d_idx:
            trans_value = self.transition_utility_table.get((u_idx, d_idx), 0.0)
            transition_gain += trans_value

    rule_gain = u_desired - u_undesired + transition_gain

    # Target utility
    u_undesired_target = self.intrinsic_utility_table.get(self.undesired_state, 0.0)
    u_desired_target = self.intrinsic_utility_table.get(self.desired_state, 0.0)
    transition_gain_target = self.transition_utility_table.get((self.undesired_state, self.desired_state), 0.0)

    target_gain = u_desired_target - u_undesired_target + transition_gain_target

    # Realistic target gain
    undesired_rule_confidence = undesired_rule.get('confidence', 0.0)
    desired_rule_confidence = desired_rule.get('confidence', 0.0)
    target_gain_realistic = (desired_rule_confidence - (1 - undesired_rule_confidence)) * target_gain

    # Rule gain
    max_rule_gain = rule_gain + target_gain
    realistic_rule_gain = rule_gain + target_gain_realistic

    # Compute dataset-level realistic gain.
    support = undesired_rule.get('support', 0)
    if undesired_rule_confidence > 0:
        transactions = support / undesired_rule_confidence
    else:
        transactions = 0.0
    realistic_rule_gain_dataset = transactions * realistic_rule_gain

    return max_rule_gain, realistic_rule_gain, realistic_rule_gain_dataset

generate_action_rules()

Generate action rules from classification rules.

Notes

This method creates action rules by combining classification rules for undesired and desired states. The uplift for each action rule is calculated using the calculate_uplift method and the result is stored in the action_rules attribute.

Source code in src/action_rules/rules/rules.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def generate_action_rules(self):
    """
    Generate action rules from classification rules.

    Notes
    -----
    This method creates action rules by combining classification rules for undesired and desired states.
    The uplift for each action rule is calculated using the `calculate_uplift` method and the result is
    stored in the action_rules attribute.
    """
    for attribute_prefix, rules in self.classification_rules.items():
        for desired_rule in rules['desired']:
            for undesired_rule in rules['undesired']:
                # Uplift
                uplift = self.calculate_uplift(
                    undesired_rule['support'],
                    undesired_rule['confidence'],
                    desired_rule['confidence'],
                )
                # Utility
                utility = None
                if self.intrinsic_utility_table is not None or self.transition_utility_table is not None:
                    (max_rule_gain, realistic_rule_gain, realistic_rule_gain_dataset) = self.compute_rule_utility(
                        undesired_rule, desired_rule
                    )
                    utility = {
                        'max_rule_gain': max_rule_gain,
                        'realistic_rule_gain': realistic_rule_gain,
                        'realistic_dataset_gain': realistic_rule_gain_dataset,
                    }
                # Action rule measures
                ar_support, ar_confidence = self.compute_action_rule_measures(
                    undesired_rule.get('support', 0.0),
                    undesired_rule.get('confidence', 0.0),
                    desired_rule.get('support', 0.0),
                    desired_rule.get('confidence', 0.0),
                )
                self.action_rules.append(
                    {
                        'undesired': undesired_rule,
                        'desired': desired_rule,
                        'uplift': uplift,
                        'support': ar_support,
                        'confidence': ar_confidence,
                        **utility,
                    }
                )

prune_classification_rules(k, stop_list)

Prune classification rules based on their length and update the stop list.

Parameters:

Name Type Description Default
k int

Length of the attribute prefix.

required
stop_list list

List of prefixes to stop generating rules for.

required
Notes

This method removes classification rules whose prefix length equals k and either desired or undesired states are empty. The corresponding prefixes are also added to the stop_list to avoid further rule generation.

Source code in src/action_rules/rules/rules.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def prune_classification_rules(self, k: int, stop_list: list):
    """
    Prune classification rules based on their length and update the stop list.

    Parameters
    ----------
    k : int
        Length of the attribute prefix.
    stop_list : list
        List of prefixes to stop generating rules for.

    Notes
    -----
    This method removes classification rules whose prefix length equals k and either desired or undesired
    states are empty. The corresponding prefixes are also added to the stop_list to avoid further rule generation.
    """
    del_prefixes = []
    for attribute_prefix, rules in self.classification_rules.items():
        if k == len(attribute_prefix):
            len_desired = len(rules['desired'])
            len_undesired = len(rules['undesired'])
            if len_desired == 0 or len_undesired == 0:
                if (len_desired == 0 and attribute_prefix not in self.desired_prefixes_without_conf) or (
                    len_undesired == 0 and attribute_prefix not in self.undesired_prefixes_without_conf
                ):
                    stop_list.append(attribute_prefix)
                del_prefixes.append(attribute_prefix)
    for attribute_prefix in del_prefixes:
        del self.classification_rules[attribute_prefix]